-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdatabase.go
170 lines (151 loc) · 5.03 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// A quickly mysql access component.
// Copyright 2023 The daog Authors. All rights reserved.
// Package daog, 是轻量级的数据库访问组件,它并不能称之为orm组件,仅仅提供了一组函数用以实现常用的数据库访问功能。
// 它是高性能的,与原生的使用sql包函数相比,没有性能损耗,这是因为,它并没有使用反射技术,而是使用编译技术把create table sql语句编译成daog需要的go代码。
// 它目前仅支持mysql。
//
// 设计思路来源于java的[orm框架sampleGenericDao](https://github.com/tiandarwin/simpleGenericDao)和protobuf的编译思路。之所以选择编译
// 而没有使用反射,是因为基于编译的抽象没有性能损耗。
package daog
import (
"context"
"database/sql"
"errors"
"github.com/rolandhe/daog/utils"
"log"
"strings"
"time"
)
var invalidShardingDatasourceKey error = errors.New("invalid shard key")
// DbConf 数据源配置, 包括数据库url和连接池相关配置,特别注意,它支持按数据源在日志中输出执行的sql
type DbConf struct {
// 数据库url
DbUrl string
// 最大连接数
Size int
// 连接的最大生命周期,单位是秒
Life int
// 最大空闲连接数
IdleCons int
// 最大空闲时间,单位是秒
IdleTime int
// 该在该数据源上执行sql是是否需要把待执行的sql输出到日志
LogSQL bool
// 读取连接超时时间,单位是秒
GetConnTimeout int64
}
// NewDatasource 按照配置创建单个数据源对象
func NewDatasource(conf *DbConf) (Datasource, error) {
dbUrl := conf.DbUrl
if -1 == strings.Index(conf.DbUrl, "interpolateParams") {
if strings.Index(conf.DbUrl, "?") != -1 {
dbUrl = dbUrl + "&interpolateParams=true"
} else {
dbUrl = dbUrl + "?interpolateParams=true"
}
}
db, err := sql.Open("mysql", dbUrl)
if err != nil {
log.Printf("goid=%d, %v\n", utils.QuickGetGoroutineId(), err)
return nil, err
}
if conf.Size > 0 {
db.SetMaxOpenConns(conf.Size)
}
if conf.IdleCons > 0 {
db.SetMaxIdleConns(conf.IdleCons)
}
if conf.IdleTime > 0 {
db.SetConnMaxIdleTime(time.Duration(int64(conf.IdleTime) * 1e9))
}
if conf.Life > 0 {
db.SetConnMaxLifetime(time.Duration(int64(conf.Life) * 1e9))
}
if conf.GetConnTimeout <= 0 {
conf.GetConnTimeout = 10
}
return &singleDatasource{db, conf.LogSQL, time.Second * time.Duration(conf.GetConnTimeout)}, nil
}
// NewShardingDatasource 创建多分片数据源,创建好的数据源是复合数据源,内含confs参数指定的多个数据源,也包含一个分片策略,
// 使用 NewShardingDatasource 数据源时要求使用 NewTransContextWithSharding 来创建事务上下文
func NewShardingDatasource(confs []*DbConf, policy DatasourceShardingPolicy) (Datasource, error) {
var dbs []Datasource
for _, conf := range confs {
ds, err := NewDatasource(conf)
if err != nil {
for _, sds := range dbs {
sds.Shutdown()
}
return nil, err
}
dbs = append(dbs, ds)
}
if len(dbs) == 0 {
return nil, errors.New("no db confs")
}
return &shardingDatasource{dbs, policy}, nil
}
// Datasource 描述一个数据源,确切的说是一个数据源分片,它对应一个mysql database
type Datasource interface {
getDB(ctx context.Context) *sql.DB
// Shutdown 关闭数据源
Shutdown()
// IsLogSQL 本数据源是否需要输出执行的sql到日志
IsLogSQL() bool
acquireConnTimeout() time.Duration
}
// DatasourceShardingPolicy 数据源分片策略
type DatasourceShardingPolicy interface {
// Shard 根据分片key和分片总数来路由分片数据源
Shard(shardKey any, count int) (int, error)
}
// ModInt64ShardingDatasourcePolicy 分片key是int64直接对分片总数取模路由策略,这是最简单的方式
type ModInt64ShardingDatasourcePolicy int64
func (h ModInt64ShardingDatasourcePolicy) Shard(shardKey any, count int) (int, error) {
key, ok := shardKey.(int64)
if !ok {
return 0, invalidShardingDatasourceKey
}
return int(key % int64(count)), nil
}
type singleDatasource struct {
db *sql.DB
logSQL bool
getConnTimeout time.Duration
}
func (db *singleDatasource) getDB(ctx context.Context) *sql.DB {
return db.db
}
func (db *singleDatasource) Shutdown() {
db.db.Close()
}
func (db *singleDatasource) IsLogSQL() bool {
return db.logSQL
}
func (db *singleDatasource) acquireConnTimeout() time.Duration {
return db.getConnTimeout
}
type shardingDatasource struct {
singleDatasource []Datasource
policy DatasourceShardingPolicy
}
func (db *shardingDatasource) getDB(ctx context.Context) *sql.DB {
key := getDatasourceShardingKeyFromCtx(ctx)
index, err := db.policy.Shard(key, len(db.singleDatasource))
if err != nil {
GLogger.Error(ctx, err)
return nil
}
return db.singleDatasource[index].getDB(ctx)
}
func (db *shardingDatasource) Shutdown() {
for _, sds := range db.singleDatasource {
sds.Shutdown()
}
}
func (db *shardingDatasource) IsLogSQL() bool {
return db.singleDatasource[0].IsLogSQL()
}
func (db *shardingDatasource) acquireConnTimeout() time.Duration {
return db.singleDatasource[0].acquireConnTimeout()
}