@@ -26,8 +26,8 @@ type BufferPool struct {
26
26
keyValuesStartPoint uint64
27
27
maxKeys uint64
28
28
redundantBlocks uint16
29
- kvBuffers []Buffer // this will act as a FIFO
30
- indexBuffers [] Buffer // this will act as a min-slice where the buffer with the biggest left-offset is replaced
29
+ kvBuffers []* Buffer // this will act as a FIFO
30
+ indexBuffers map [ uint64 ] * Buffer
31
31
File * os.File
32
32
FilePath string
33
33
FileSize uint64
@@ -94,8 +94,8 @@ func NewBufferPool(capacity *uint64, filePath string, maxKeys *uint64, redundant
94
94
keyValuesStartPoint : header .KeyValuesStartPoint ,
95
95
maxKeys : header .MaxKeys ,
96
96
redundantBlocks : header .RedundantBlocks ,
97
- kvBuffers : make ([]Buffer , 0 , kvCap ),
98
- indexBuffers : make ([] Buffer , 0 , indexCap ),
97
+ kvBuffers : make ([]* Buffer , 0 , kvCap ),
98
+ indexBuffers : make (map [ uint64 ] * Buffer , indexCap ),
99
99
File : file ,
100
100
FilePath : filePath ,
101
101
FileSize : fileSize ,
@@ -118,7 +118,7 @@ func (bp *BufferPool) Append(data []byte) (uint64, error) {
118
118
start := len (bp .kvBuffers ) - 1
119
119
for i := start ; i >= 0 ; i -- {
120
120
// make sure you get the pointer
121
- buf := & bp .kvBuffers [i ]
121
+ buf := bp .kvBuffers [i ]
122
122
if buf .CanAppend (bp .FileSize ) {
123
123
// write the data to buffer
124
124
addr := buf .Append (data )
@@ -157,13 +157,12 @@ func (bp *BufferPool) UpdateIndex(addr uint64, data []byte) error {
157
157
return err
158
158
}
159
159
160
- for i := 0 ; i < len (bp .indexBuffers ); i ++ {
161
- buf := & bp .indexBuffers [i ]
162
- if buf .Contains (addr ) {
163
- err = buf .Replace (addr , data )
164
- if err != nil {
165
- return err
166
- }
160
+ blockLeftOffset := bp .getBlockLeftOffset (addr , entries .HeaderSizeInBytes )
161
+ buf , ok := bp .indexBuffers [blockLeftOffset ]
162
+ if ok {
163
+ err = buf .Replace (addr , data )
164
+ if err != nil {
165
+ return err
167
166
}
168
167
}
169
168
@@ -180,8 +179,8 @@ func (bp *BufferPool) ClearFile() error {
180
179
return err
181
180
}
182
181
bp .FileSize = uint64 (fileSize )
183
- bp .indexBuffers = make ([] Buffer , 0 , bp .indexCapacity )
184
- bp .kvBuffers = make ([] Buffer , 0 , bp .kvCapacity )
182
+ bp .indexBuffers = make (map [ uint64 ] * Buffer , bp .indexCapacity )
183
+ bp .kvBuffers = bp .kvBuffers [: 0 ]
185
184
return nil
186
185
}
187
186
@@ -272,8 +271,8 @@ func (bp *BufferPool) CompactFile() error {
272
271
}
273
272
274
273
// clean up the buffers and update metadata
275
- bp .kvBuffers = make ([] Buffer , 0 , bp .kvCapacity )
276
- bp .indexBuffers = make ([] Buffer , 0 , bp .indexCapacity )
274
+ bp .kvBuffers = bp .kvBuffers [: 0 ]
275
+ bp .indexBuffers = make (map [ uint64 ] * Buffer , bp .indexCapacity )
277
276
bp .File = newFile
278
277
bp .FileSize = uint64 (newFileOffset )
279
278
@@ -298,7 +297,7 @@ func (bp *BufferPool) GetValue(kvAddress uint64, key []byte) (*Value, error) {
298
297
// since the latest kv_buffers are the ones updated when new changes occur
299
298
kvBufLen := len (bp .kvBuffers )
300
299
for i := kvBufLen - 1 ; i >= 0 ; i -- {
301
- buf := & bp .kvBuffers [i ]
300
+ buf := bp .kvBuffers [i ]
302
301
if buf .Contains (kvAddress ) {
303
302
return buf .GetValue (kvAddress , key )
304
303
}
@@ -316,7 +315,7 @@ func (bp *BufferPool) GetValue(kvAddress uint64, key []byte) (*Value, error) {
316
315
}
317
316
318
317
// update kv_buffers only upto actual data read (cater for partially filled buffer)
319
- bp .kvBuffers = append (bp .kvBuffers , * NewBuffer (kvAddress , buf [:bytesRead ], bp .bufferSize ))
318
+ bp .kvBuffers = append (bp .kvBuffers , NewBuffer (kvAddress , buf [:bytesRead ], bp .bufferSize ))
320
319
entry , err := entries .ExtractKeyValueEntryFromByteArray (buf , 0 )
321
320
if err != nil {
322
321
return nil , err
@@ -338,7 +337,7 @@ func (bp *BufferPool) TryDeleteKvEntry(kvAddress uint64, key []byte) (bool, erro
338
337
// since the latest kv_buffers are the ones updated when new changes occur
339
338
kvBufLen := len (bp .kvBuffers )
340
339
for i := kvBufLen - 1 ; i >= 0 ; i -- {
341
- buf := & bp .kvBuffers [i ]
340
+ buf := bp .kvBuffers [i ]
342
341
if buf .Contains (kvAddress ) {
343
342
success , err := buf .TryDeleteKvEntry (kvAddress , key )
344
343
if err != nil {
@@ -367,6 +366,8 @@ func (bp *BufferPool) TryDeleteKvEntry(kvAddress uint64, key []byte) (bool, erro
367
366
if err != nil {
368
367
return false , err
369
368
}
369
+
370
+ return true , nil
370
371
}
371
372
372
373
return false , nil
@@ -386,7 +387,7 @@ func (bp *BufferPool) AddrBelongsToKey(kvAddress uint64, key []byte) (bool, erro
386
387
// since the latest kv_buffers are the ones updated when new changes occur
387
388
kvBufLen := len (bp .kvBuffers )
388
389
for i := kvBufLen - 1 ; i >= 0 ; i -- {
389
- buf := & bp .kvBuffers [i ]
390
+ buf := bp .kvBuffers [i ]
390
391
if buf .Contains (kvAddress ) {
391
392
return buf .AddrBelongsToKey (kvAddress , key )
392
393
}
@@ -404,7 +405,7 @@ func (bp *BufferPool) AddrBelongsToKey(kvAddress uint64, key []byte) (bool, erro
404
405
}
405
406
406
407
// update kv_buffers only upto actual data read (cater for partially filled buffer)
407
- bp .kvBuffers = append (bp .kvBuffers , * NewBuffer (kvAddress , buf [:bytesRead ], bp .bufferSize ))
408
+ bp .kvBuffers = append (bp .kvBuffers , NewBuffer (kvAddress , buf [:bytesRead ], bp .bufferSize ))
408
409
409
410
keyInFile := buf [entries .OffsetForKeyInKVArray : entries .OffsetForKeyInKVArray + uint64 (len (key ))]
410
411
isForKey := bytes .Contains (keyInFile , key )
@@ -421,48 +422,47 @@ func (bp *BufferPool) ReadIndex(addr uint64) ([]byte, error) {
421
422
return nil , err
422
423
}
423
424
424
- size := entries .IndexEntrySizeInBytes
425
- // starts from buffer with lowest left_offset, which I expect to have more keys
426
- idxBufLen := len (bp .indexBuffers )
427
- for i := 0 ; i < idxBufLen ; i ++ {
428
- buf := & bp .indexBuffers [i ]
429
- if buf .Contains (addr ) {
430
- return buf .ReadAt (addr , size )
431
- }
425
+ blockLeftOffset := bp .getBlockLeftOffset (addr , entries .HeaderSizeInBytes )
426
+ buf , ok := bp .indexBuffers [blockLeftOffset ]
427
+ if ok {
428
+ return buf .ReadAt (addr , entries .IndexEntrySizeInBytes )
432
429
}
433
430
434
- buf := make ([]byte , bp .bufferSize )
435
- bytesRead , err := bp .File .ReadAt (buf , int64 (addr ))
431
+ data := make ([]byte , bp .bufferSize )
432
+ // Index buffers should have preset boundaries matching
433
+ // StartOfIndex - StartOfIndex + BlockSize,
434
+ // StartOfIndex + BlockSize - StartOfIndex + (2*BlockSize)
435
+ // StartOfIndex + (2*BlockSize) - StartOfIndex + (3*BlockSize) ...
436
+ _ , err = bp .File .ReadAt (data , int64 (blockLeftOffset ))
436
437
if err != nil && ! errors .Is (err , io .EOF ) {
437
438
return nil , err
438
439
}
439
440
440
- // update kv_buffers only upto actual data read (cater for partially filled buffer)
441
- newIdxBuf := NewBuffer (addr , buf [:bytesRead ], bp .bufferSize )
442
-
443
- if uint64 (idxBufLen ) >= bp .indexCapacity {
444
- // we wish to remove the last buf, and replace it with the new one
445
- // but maintain the ascending order of LeftOffsets
446
- // we will start at the second-last buf and move towards the front of the slice
447
- for i := idxBufLen - 2 ; i >= 0 ; i -- {
448
- currOffset := (& bp .indexBuffers [i ]).LeftOffset
449
- if currOffset < newIdxBuf .LeftOffset || i == 0 {
450
- // copy the new buffer in previous position and stop if
451
- // the current buffer has a lower left offset or if we
452
- // have reached the end of the array
453
- bp .indexBuffers [i + 1 ] = * newIdxBuf
454
- break
455
- } else {
456
- // move current buf backwards
457
- bp .indexBuffers [i + 1 ] = bp .indexBuffers [i ]
441
+ if uint64 (len (bp .indexBuffers )) >= bp .indexCapacity {
442
+ biggestLeftOffset := uint64 (0 )
443
+ for lftOffset , _ := range bp .indexBuffers {
444
+ if lftOffset >= biggestLeftOffset {
445
+ biggestLeftOffset = lftOffset
458
446
}
459
447
}
448
+
449
+ // delete the buffer with the biggest left offset as those with lower left offsets
450
+ // are expected to have more keys
451
+ delete (bp .indexBuffers , biggestLeftOffset )
452
+ bp .indexBuffers [blockLeftOffset ] = NewBuffer (blockLeftOffset , data , bp .bufferSize )
460
453
} else {
461
- // Just append
462
- bp .indexBuffers = append (bp .indexBuffers , * newIdxBuf )
454
+ bp .indexBuffers [blockLeftOffset ] = NewBuffer (blockLeftOffset , data , bp .bufferSize )
463
455
}
464
456
465
- return buf [:size ], nil
457
+ start := addr - blockLeftOffset
458
+ return data [start : start + entries .IndexEntrySizeInBytes ], nil
459
+ }
460
+
461
+ // getBlockLeftOffset returns the left offset for the block in which the address is to be found
462
+ func (bp * BufferPool ) getBlockLeftOffset (addr uint64 , minOffset uint64 ) uint64 {
463
+ blockPosition := (addr - minOffset ) / bp .bufferSize
464
+ blockLeftOffset := (blockPosition * bp .bufferSize ) + minOffset
465
+ return blockLeftOffset
466
466
}
467
467
468
468
// Eq checks that other is equal to bp
@@ -482,13 +482,13 @@ func (bp *BufferPool) Eq(other *BufferPool) bool {
482
482
}
483
483
484
484
for i , buf := range bp .kvBuffers {
485
- if ! buf .Eq (& other .kvBuffers [i ]) {
485
+ if ! buf .Eq (other .kvBuffers [i ]) {
486
486
return false
487
487
}
488
488
}
489
489
490
490
for i , buf := range bp .indexBuffers {
491
- if ! buf .Eq (& other .indexBuffers [i ]) {
491
+ if ! buf .Eq (other .indexBuffers [i ]) {
492
492
return false
493
493
}
494
494
}
0 commit comments