-
Notifications
You must be signed in to change notification settings - Fork 69
/
Copy pathmerge_operator.go
177 lines (153 loc) · 6.88 KB
/
merge_operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package grocksdb
// #include "rocksdb/c.h"
import "C"
import "unsafe"
// A MergeOperator specifies the SEMANTICS of a merge, which only
// client knows. It could be numeric addition, list append, string
// concatenation, edit data structure, ... , anything.
// The library, on the other hand, is concerned with the exercise of this
// interface, at the right time (during get, iteration, compaction...)
//
// Please read the RocksDB documentation <http://rocksdb.org/> for
// more details and example implementations.
type MergeOperator interface {
// Gives the client a way to express the read -> modify -> write semantics
// key: The key that's associated with this merge operation.
// Client could multiplex the merge operator based on it
// if the key space is partitioned and different subspaces
// refer to different types of data which have different
// merge operation semantics.
// existingValue: null indicates that the key does not exist before this op.
// operands: the sequence of merge operations to apply, front() first.
//
// Return true on success.
//
// All values passed in will be client-specific values. So if this method
// returns false, it is because client specified bad data or there was
// internal corruption. This will be treated as an error by the library.
FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool)
// The name of the MergeOperator.
Name() string
}
// PartialMerger implements PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, err)
// When a MergeOperator implements this interface, PartialMerge will be called in addition
// to FullMerge for compactions across levels
type PartialMerger interface {
// This function performs merge(left_op, right_op)
// when both the operands are themselves merge operation types
// that you would have passed to a db.Merge() call in the same order
// (i.e.: db.Merge(key,left_op), followed by db.Merge(key,right_op)).
//
// PartialMerge should combine them into a single merge operation.
// The return value should be constructed such that a call to
// db.Merge(key, new_value) would yield the same result as a call
// to db.Merge(key, left_op) followed by db.Merge(key, right_op).
//
// If it is impossible or infeasible to combine the two operations, return false.
// The library will internally keep track of the operations, and apply them in the
// correct order once a base-value (a Put/Delete/End-of-Database) is seen.
PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool)
}
// MultiMerger implements PartialMergeMulti(key []byte, operands [][]byte) ([]byte, err)
// When a MergeOperator implements this interface, PartialMergeMulti will be called in addition
// to FullMerge for compactions across levels
type MultiMerger interface {
// PartialMerge performs merge on multiple operands
// when all of the operands are themselves merge operation types
// that you would have passed to a db.Merge() call in the same order
// (i.e.: db.Merge(key,operand[0]), followed by db.Merge(key,operand[1]),
// ... db.Merge(key, operand[n])).
//
// PartialMerge should combine them into a single merge operation.
// The return value should be constructed such that a call to
// db.Merge(key, new_value) would yield the same result as a call
// to db.Merge(key,operand[0]), followed by db.Merge(key,operand[1]),
// ... db.Merge(key, operand[n])).
//
// If it is impossible or infeasible to combine the operations, return false.
// The library will internally keep track of the operations, and apply them in the
// correct order once a base-value (a Put/Delete/End-of-Database) is seen.
PartialMergeMulti(key []byte, operands [][]byte) ([]byte, bool)
// Destroy pointer/underlying data
Destroy()
}
// NewNativeMergeOperator creates a MergeOperator object.
func NewNativeMergeOperator(c unsafe.Pointer) MergeOperator {
return &nativeMergeOperator{c: (*C.rocksdb_mergeoperator_t)(c)}
}
type nativeMergeOperator struct {
c *C.rocksdb_mergeoperator_t
}
func (mo *nativeMergeOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) {
return nil, false
}
func (mo *nativeMergeOperator) PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool) {
return nil, false
}
func (mo *nativeMergeOperator) Name() string { return "" }
func (mo *nativeMergeOperator) Destroy() {
C.rocksdb_mergeoperator_destroy(mo.c)
mo.c = nil
}
// Hold references to merge operators.
var mergeOperators = NewCOWList()
type mergeOperatorWrapper struct {
name *C.char
mergeOperator MergeOperator
}
func registerMergeOperator(merger MergeOperator) int {
return mergeOperators.Append(mergeOperatorWrapper{C.CString(merger.Name()), merger})
}
//export gorocksdb_mergeoperator_full_merge
func gorocksdb_mergeoperator_full_merge(idx int, cKey *C.char, cKeyLen C.size_t, cExistingValue *C.char, cExistingValueLen C.size_t, cOperands **C.char, cOperandsLen *C.size_t, cNumOperands C.int, cSuccess *C.uchar, cNewValueLen *C.size_t) *C.char {
key := refCBytes(cKey, cKeyLen)
rawOperands := charSlice(cOperands, cNumOperands)
operandsLen := sizeSlice(cOperandsLen, cNumOperands)
existingValue := refCBytes(cExistingValue, cExistingValueLen)
operands := make([][]byte, int(cNumOperands))
for i, len := range operandsLen {
operands[i] = refCBytes(rawOperands[i], len)
}
newValue, success := mergeOperators.Get(idx).(mergeOperatorWrapper).mergeOperator.FullMerge(key, existingValue, operands)
newValueLen := len(newValue)
*cNewValueLen = C.size_t(newValueLen)
*cSuccess = boolToChar(success)
return cByteSlice(newValue)
}
//export gorocksdb_mergeoperator_partial_merge_multi
func gorocksdb_mergeoperator_partial_merge_multi(idx int, cKey *C.char, cKeyLen C.size_t, cOperands **C.char, cOperandsLen *C.size_t, cNumOperands C.int, cSuccess *C.uchar, cNewValueLen *C.size_t) *C.char {
key := refCBytes(cKey, cKeyLen)
rawOperands := charSlice(cOperands, cNumOperands)
operandsLen := sizeSlice(cOperandsLen, cNumOperands)
operands := make([][]byte, int(cNumOperands))
for i, len := range operandsLen {
operands[i] = refCBytes(rawOperands[i], len)
}
var newValue []byte
success := true
merger := mergeOperators.Get(idx).(mergeOperatorWrapper).mergeOperator
// check if this MergeOperator supports partial or multi merges
switch v := merger.(type) {
case MultiMerger:
newValue, success = v.PartialMergeMulti(key, operands)
case PartialMerger:
leftOperand := operands[0]
for i := 1; i < int(cNumOperands); i++ {
newValue, success = v.PartialMerge(key, leftOperand, operands[i])
if !success {
break
}
leftOperand = newValue
}
default:
success = false
}
newValueLen := len(newValue)
*cNewValueLen = C.size_t(newValueLen)
*cSuccess = boolToChar(success)
return cByteSlice(newValue)
}
//export gorocksdb_mergeoperator_name
func gorocksdb_mergeoperator_name(idx int) *C.char {
return mergeOperators.Get(idx).(mergeOperatorWrapper).name
}