Skip to content

Commit

Permalink
refactor: improve code structure
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Aug 1, 2024
1 parent d5aa4f6 commit f9bdeb0
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 110 deletions.
196 changes: 86 additions & 110 deletions dataloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,6 @@ import (
// Loader is the function type for loading data
type Loader[K comparable, V any] func(context.Context, []K) []Result[V]

// config holds the configuration for DataLoader
type config struct {
// BatchSize is the number of keys to batch together, Default is 100
BatchSize int
// Wait is the duration to wait before processing a batch, Default is 16ms
Wait time.Duration
// CacheSize is the size of the cache, Default is 1024
CacheSize int
// CacheExpire is the duration to expire cache items, Default is 1 minute
CacheExpire time.Duration
}

// dataLoader is the main struct for the dataloader
type dataLoader[K comparable, V any] struct {
loader Loader[K, V]
cache *expirable.LRU[K, V]
config config
mu sync.Mutex
batch []K
chs map[K][]chan Result[V]
stopSchedule chan struct{}
}

// Interface is a `DataLoader` Interface which defines a public API for loading data from a particular
// data back-end with unique keys such as the `id` column of a SQL table or
// document name in a MongoDB database, given a batch loading function.
Expand All @@ -60,6 +37,29 @@ type Interface[K comparable, V any] interface {
Prime(ctx context.Context, key K, value V) Interface[K, V]
}

// config holds the configuration for DataLoader
type config struct {
// BatchSize is the number of keys to batch together, Default is 100
BatchSize int
// Wait is the duration to wait before processing a batch, Default is 16ms
Wait time.Duration
// CacheSize is the size of the cache, Default is 1024
CacheSize int
// CacheExpire is the duration to expire cache items, Default is 1 minute
CacheExpire time.Duration
}

// dataLoader is the main struct for the dataloader
type dataLoader[K comparable, V any] struct {
loader Loader[K, V]
cache *expirable.LRU[K, V]
config config
mu sync.Mutex
batch []K
chs map[K][]chan Result[V]
stopSchedule chan struct{}
}

// New creates a new DataLoader with the given loader function and options
func New[K comparable, V any](loader Loader[K, V], options ...Option) Interface[K, V] {
config := config{
Expand Down Expand Up @@ -88,29 +88,68 @@ func New[K comparable, V any](loader Loader[K, V], options ...Option) Interface[
return dl
}

// Option is a function type for configuring DataLoader
type Option func(*config)
// Load loads a single key
func (d *dataLoader[K, V]) Load(ctx context.Context, key K) Result[V] {
return <-d.goLoad(ctx, key)
}

// LoadMany loads multiple keys
func (d *dataLoader[K, V]) LoadMany(ctx context.Context, keys []K) []Result[V] {
chs := make([]<-chan Result[V], len(keys))
for i, key := range keys {
chs[i] = d.goLoad(ctx, key)
}

results := make([]Result[V], len(keys))
for i, ch := range chs {
results[i] = <-ch
}

return results
}

// LoadMap loads multiple keys and returns a map of results
func (d *dataLoader[K, V]) LoadMap(ctx context.Context, keys []K) map[K]Result[V] {
chs := make([]<-chan Result[V], len(keys))
for i, key := range keys {
chs[i] = d.goLoad(ctx, key)
}

results := make(map[K]Result[V], len(keys))
for i, ch := range chs {
results[keys[i]] = <-ch
}

// WithCache sets the cache size for the DataLoader
func WithCache(size int, expire time.Duration) Option {
return func(c *config) {
c.CacheSize = size
c.CacheExpire = expire
return results
}

// Clear removes an item from the cache
func (d *dataLoader[K, V]) Clear(key K) Interface[K, V] {
if d.cache != nil {
d.cache.Remove(key)
}

return d
}

// WithBatchSize sets the batch size for the DataLoader
func WithBatchSize(size int) Option {
return func(c *config) {
c.BatchSize = size
// ClearAll clears the entire cache
func (d *dataLoader[K, V]) ClearAll() Interface[K, V] {
if d.cache != nil {
d.cache.Purge()
}

return d
}

// WithWait sets the wait duration for the DataLoader
func WithWait(wait time.Duration) Option {
return func(c *config) {
c.Wait = wait
// Prime primes the cache with a key and value
func (d *dataLoader[K, V]) Prime(ctx context.Context, key K, value V) Interface[K, V] {
if d.cache != nil {
if _, ok := d.cache.Get(key); ok {
d.cache.Add(key, value)
}
}

return d
}

// goLoad loads a single key asynchronously
Expand Down Expand Up @@ -160,47 +199,6 @@ func (d *dataLoader[K, V]) goLoad(ctx context.Context, key K) <-chan Result[V] {
return ch
}

// Load loads a single key
func (d *dataLoader[K, V]) Load(ctx context.Context, key K) Result[V] {
return <-d.goLoad(ctx, key)
}

// LoadMany loads multiple keys
func (d *dataLoader[K, V]) LoadMany(ctx context.Context, keys []K) []Result[V] {
chs := make([]<-chan Result[V], len(keys))
for i, key := range keys {
chs[i] = d.goLoad(ctx, key)
}

results := make([]Result[V], len(keys))
for i, ch := range chs {
results[i] = <-ch
}

return results
}

// LoadMap loads multiple keys and returns a map of results
func (d *dataLoader[K, V]) LoadMap(ctx context.Context, keys []K) map[K]Result[V] {
chs := make([]<-chan Result[V], len(keys))
for i, key := range keys {
chs[i] = d.goLoad(ctx, key)
}

results := make(map[K]Result[V], len(keys))
for i, ch := range chs {
results[keys[i]] = <-ch
}

return results
}

// reset resets the DataLoader
func (d *dataLoader[K, V]) reset() {
d.batch = make([]K, 0, d.config.BatchSize)
d.chs = make(map[K][]chan Result[V], d.config.BatchSize)
}

// scheduleBatch schedules a batch to be processed
func (d *dataLoader[K, V]) scheduleBatch(ctx context.Context, stopSchedule <-chan struct{}) {
select {
Expand All @@ -223,11 +221,12 @@ func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, chs map[K
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
fmt.Fprintf(os.Stderr, "Dataloader: Panic received in loader function: %v\n%s", r, buf)
err := fmt.Errorf("dataloader: panic received in loader function: %v", r)
fmt.Fprintf(os.Stderr, "%v\n%s", err, buf)

for _, chs := range chs {
for _, ch := range chs {
ch <- Result[V]{err: fmt.Errorf("panic received in loader function: %v", r)}
ch <- Result[V]{err: err}
close(ch)
}
}
Expand All @@ -245,39 +244,16 @@ func (d *dataLoader[K, V]) processBatch(ctx context.Context, keys []K, chs map[K
}
}

// reset resets the DataLoader
func (d *dataLoader[K, V]) reset() {
d.batch = make([]K, 0, d.config.BatchSize)
d.chs = make(map[K][]chan Result[V], d.config.BatchSize)
}

// sendResult sends a result to channels
func sendResult[V any](chs []chan Result[V], result Result[V]) {
for _, ch := range chs {
ch <- result
close(ch)
}
}

// Clear removes an item from the cache
func (d *dataLoader[K, V]) Clear(key K) Interface[K, V] {
if d.cache != nil {
d.cache.Remove(key)
}

return d
}

// ClearAll clears the entire cache
func (d *dataLoader[K, V]) ClearAll() Interface[K, V] {
if d.cache != nil {
d.cache.Purge()
}

return d
}

// Prime primes the cache with a key and value
func (d *dataLoader[K, V]) Prime(ctx context.Context, key K, value V) Interface[K, V] {
if d.cache != nil {
if _, ok := d.cache.Get(key); ok {
d.cache.Add(key, value)
}
}

return d
}
28 changes: 28 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package dataloader

import "time"

// Option is a function type for configuring DataLoader
type Option func(*config)

// WithCache sets the cache size for the DataLoader
func WithCache(size int, expire time.Duration) Option {
return func(c *config) {
c.CacheSize = size
c.CacheExpire = expire
}
}

// WithBatchSize sets the batch size for the DataLoader
func WithBatchSize(size int) Option {
return func(c *config) {
c.BatchSize = size
}
}

// WithWait sets the wait duration for the DataLoader
func WithWait(wait time.Duration) Option {
return func(c *config) {
c.Wait = wait
}
}

0 comments on commit f9bdeb0

Please sign in to comment.