Skip to content

Commit c7b8770

Browse files
committed
support raft cluster
1 parent 50905bd commit c7b8770

12 files changed

+890
-31
lines changed

bitmaps.go

+45-7
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,30 @@ package basalt
22

33
import (
44
"encoding/binary"
5+
"fmt"
56
"io"
67
"sync"
78

89
"github.com/RoaringBitmap/roaring"
910
"github.com/smallnest/log"
1011
)
1112

13+
// OP bitmaps operations
14+
type OP byte
15+
16+
const (
17+
BmOpAdd OP = 1
18+
BmOpAddMany = 2
19+
BmOpRemove = 3
20+
BmOpDrop = 4
21+
BmOpClear = 5
22+
)
23+
1224
// Bitmaps contains all bitmaps of namespace.
1325
type Bitmaps struct {
14-
mu sync.RWMutex
15-
bitmaps map[string]*Bitmap
26+
mu sync.RWMutex
27+
bitmaps map[string]*Bitmap
28+
writeCallback func(op OP, value string)
1629
}
1730

1831
// NewBitmaps creates a Bitmaps.
@@ -29,7 +42,12 @@ type Bitmap struct {
2942
}
3043

3144
// Add adds a value.
32-
func (bs *Bitmaps) Add(name string, v uint32) {
45+
func (bs *Bitmaps) Add(name string, v uint32, callback bool) {
46+
if bs.writeCallback != nil && callback {
47+
bs.writeCallback(BmOpAdd, fmt.Sprintf("%s,%d", name, v))
48+
return
49+
}
50+
3351
bs.mu.Lock()
3452
bm := bs.bitmaps[name]
3553
if bm == nil {
@@ -46,7 +64,12 @@ func (bs *Bitmaps) Add(name string, v uint32) {
4664
}
4765

4866
// AddMany adds multiple values.
49-
func (bs *Bitmaps) AddMany(name string, v []uint32) {
67+
func (bs *Bitmaps) AddMany(name string, v []uint32, callback bool) {
68+
if bs.writeCallback != nil && callback {
69+
bs.writeCallback(BmOpAddMany, fmt.Sprintf("%s,%d", name, ints2str(v)))
70+
return
71+
}
72+
5073
bs.mu.Lock()
5174
bm := bs.bitmaps[name]
5275
if bm == nil {
@@ -63,7 +86,12 @@ func (bs *Bitmaps) AddMany(name string, v []uint32) {
6386
}
6487

6588
// Remove removes a value.
66-
func (bs *Bitmaps) Remove(name string, v uint32) {
89+
func (bs *Bitmaps) Remove(name string, v uint32, callback bool) {
90+
if bs.writeCallback != nil && callback {
91+
bs.writeCallback(BmOpRemove, fmt.Sprintf("%s,%d", name, v))
92+
return
93+
}
94+
6795
bs.mu.Lock()
6896
bm := bs.bitmaps[name]
6997
if bm == nil {
@@ -80,14 +108,24 @@ func (bs *Bitmaps) Remove(name string, v uint32) {
80108
}
81109

82110
// RemoveBitmap removes a bitmap.
83-
func (bs *Bitmaps) RemoveBitmap(name string) {
111+
func (bs *Bitmaps) RemoveBitmap(name string, callback bool) {
112+
if bs.writeCallback != nil && callback {
113+
bs.writeCallback(BmOpDrop, name)
114+
return
115+
}
116+
84117
bs.mu.Lock()
85118
delete(bs.bitmaps, name)
86119
bs.mu.Unlock()
87120
}
88121

89122
// ClearBitmap clear a bitmap.
90-
func (bs *Bitmaps) ClearBitmap(name string) {
123+
func (bs *Bitmaps) ClearBitmap(name string, callback bool) {
124+
if bs.writeCallback != nil && callback {
125+
bs.writeCallback(BmOpClear, name)
126+
return
127+
}
128+
91129
bs.mu.RLock()
92130
bm := bs.bitmaps[name]
93131
if bm == nil {

cmd/raft_server/README.md

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
## 分布式位图服务
2+
3+
基于raft的位图服务,可以搭建一个n个节点的raft集群,保证数据一致性。
4+
5+
位图服务是一个写少读多的服务,所以基于raft的实现可以满足性能的要求。
6+
7+
同时,考虑到位图服务的应用场景并不是严格强一致性的场景, 读操作并不基于raft的线性读或者lease read,而是保证最终一致性。
8+
9+
10+
### 测试集群
11+
12+
以三个节点的集群为例:
13+
14+
```
15+
basalt --id 1 --peers http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --addr :18972 --data bitmaps1.bdb
16+
basalt --id 2 --peers http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --addr :28972 --data bitmaps2.bdb
17+
basalt --id 3 --peers http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --addr :38972 --data bitmaps3.bdb
18+
```
19+
20+
21+
测试在第一个节点增加一个数据:
22+
```sh
23+
basalt git:(master) ✗ curl "http://127.0.0.1:18972/add/test/1000"
24+
```
25+
26+
在第二个节点检查这个数据是否存在,正常应该返回`200 OK`
27+
```
28+
➜ basalt git:(master) ✗ curl -v "http://127.0.0.1:28972/exists/test/1000"
29+
< HTTP/1.1 200 OK
30+
```

cmd/raft_server/basalt.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"log"
6+
"os"
7+
"strings"
8+
9+
"github.com/rpcxio/basalt"
10+
"github.com/rpcxio/etcd/raft/raftpb"
11+
)
12+
13+
var (
14+
addr = flag.String("addr", ":18972", "the listened address")
15+
dataFile = flag.String("data", "bitmaps.bdb", "the persisted file")
16+
17+
peers = flag.String("peers", "http://127.0.0.1:12379", "comma separated peers in a cluster")
18+
id = flag.Int("id", 1, "node ID")
19+
join = flag.Bool("join", false, "join an existing cluster")
20+
)
21+
22+
func main() {
23+
flag.Parse()
24+
25+
if _, err := os.Stat(*dataFile); os.IsNotExist(err) {
26+
f, err := os.Create(*dataFile)
27+
if err != nil {
28+
log.Fatalf("failed to create file %s: %v", *dataFile, err)
29+
}
30+
f.Close()
31+
}
32+
33+
// bitmap
34+
bitmaps := basalt.NewBitmaps()
35+
srv := basalt.NewServer(*addr, bitmaps, nil, *dataFile)
36+
37+
// raft
38+
proposeC := make(chan string)
39+
defer close(proposeC)
40+
confChangeC := make(chan raftpb.ConfChange)
41+
defer close(confChangeC)
42+
43+
var raftServer *basalt.RaftServer
44+
getSnapshot := func() ([]byte, error) { return raftServer.GetSnapshot() }
45+
commitC, errorC, snapshotterReady := basalt.NewRaftNode(*id, strings.Split(*peers, ","), *join, getSnapshot, proposeC, confChangeC)
46+
47+
raftServer = basalt.NewRaftServer(srv, <-snapshotterReady, proposeC, commitC, errorC)
48+
49+
if err := srv.Serve(); err != nil {
50+
log.Fatalf("failed to start basalt services:%v", err)
51+
}
52+
srv.Close()
53+
}

go.mod

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
module github.com/rpcxio/basalt
22

3-
go 1.13
3+
go 1.14
44

55
require (
66
github.com/RoaringBitmap/roaring v0.4.21
77
github.com/go-redis/redis v6.15.7+incompatible
88
github.com/julienschmidt/httprouter v1.3.0
9+
github.com/rpcxio/etcd v0.0.0-20200729120139-f9cde972fd94
910
github.com/smallnest/log v0.0.0-20190128090703-5dc5752d8772
1011
github.com/smallnest/rpcx v0.0.0-20200213044823-78d7a4d32e2a
1112
github.com/soheilhy/cmux v0.1.4
1213
github.com/tidwall/redcon v1.2.1
13-
golang.org/x/text v0.3.2
14+
go.uber.org/zap v1.14.1
1415
)

0 commit comments

Comments
 (0)