From 94e3f108d0bfade483fc2d9e5caaf7ee66e22c12 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 23 Nov 2020 22:15:50 -0600 Subject: [PATCH] updates/fixes --- docs/architecture/adr-038-state-listening.md | 80 +++++++++++++------- 1 file changed, 51 insertions(+), 29 deletions(-) diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index c4eaadd0a688..7e83957b3559 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -10,7 +10,7 @@ Proposed ## Abstract -This ADR defines a set of changes to enable state change listening of individual KVStores. +This ADR defines a set of changes to enable listening to state changes of individual KVStores and exposing these data to consumers. ## Context @@ -19,8 +19,8 @@ In addition to these request/response queries, it would be beneficial to have a ## Decision -We will modify the MultiStore interface and its concrete (`basemulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to specific state changes in underlying KVStores and routing the output to consumers. -We will also introduce two approaches for exposing the data to consumers: writing to files and writing to a gRPC stream. +We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to specific state changes in underlying KVStores. +We will also introduce two approaches for exposing the data to external consumers: writing to files and writing to a gRPC stream. ### Listening interface In a new file- `store/types/listening.go`- we will create a `Listening` interface for streaming out an allowed subset of state changes from a KVStore. @@ -128,12 +128,12 @@ which direct the streaming of only certain allowed subsets of keys and/or operat // underlying listeners with the proper key and operation permissions type Store struct { parent types.KVStore - listeners []types.Listener + listeners []types.Listening } // NewStore returns a reference to a new traceKVStore given a parent // KVStore implementation and a buffered writer. -func NewStore(parent types.KVStore, listeners []types.Listener) *Store { +func NewStore(parent types.KVStore, listeners []types.Listening) *Store { return &Store{parent: parent, listeners: listeners} } @@ -143,7 +143,6 @@ func NewStore(parent types.KVStore, listeners []types.Listener) *Store { // delegates a Get call to the parent KVStore. func (tkv *Store) Get(key []byte) []byte { value := tkv.parent.Get(key) - writeOperation(tkv.listeners, types.ReadOp, key, value) return value } @@ -174,7 +173,11 @@ func (tkv *Store) Has(key []byte) bool { // writeOperation writes a KVStore operation to the underlying io.Writer of // every listener that has permissions to listen to that operation at the given key // The TraceOperation is JSON-encoded with the `key` and `value` fields as base64 encoded strings -func writeOperation(listeners []types.Listener, op types.Operation, key, value []byte) { +func writeOperation(listeners []types.Listening, op types.Operation, key, value []byte) { + // short circuit if there are no listeners so we don't waste time base64 encoding `key` and `value` + if len(listeners) == 0 { + return + } traceOp := types.TraceOperation{ Operation: op, Key: base64.StdEncoding.EncodeToString(key), @@ -184,15 +187,15 @@ func writeOperation(listeners []types.Listener, op types.Operation, key, value [ if !l.Allowed(op, key) { continue } - traceOp.Metadata = l.Context + traceOp.Metadata = l.GetContext() raw, err := json.Marshal(traceOp) if err != nil { panic(errors.Wrap(err, "failed to serialize listen operation")) } - if _, err := l.Writer.Write(raw); err != nil { + if _, err := l.Write(raw); err != nil { panic(errors.Wrap(err, "failed to write listen operation")) } - io.WriteString(l.Writer, "\n") + io.WriteString(l, "\n") } } ``` @@ -210,7 +213,7 @@ type MultiStore interface { ListeningEnabled(key StoreKey) bool // SetListeners sets the listener set for the KVStore belonging to the provided StoreKey - SetListeners(key StoreKey, listeners []Listener) + SetListeners(key StoreKey, listeners []Listening) // CacheListening enables or disables KVStore listening at the cache layer CacheListening(listen bool) @@ -219,23 +222,23 @@ type MultiStore interface { ```go type CacheWrap interface { - ... + ... // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(listeners []Listener) CacheWrap + CacheWrapWithListeners(listeners []Listening) CacheWrap } type CacheWrapper interface { ... // CacheWrapWithListeners recursively wraps again with listening enabled - CacheWrapWithListeners(listeners []Listener) CacheWrap + CacheWrapWithListeners(listeners []Listening) CacheWrap } ``` ### MultiStore implementation updates -We will modify all of the Stores and MultiStores to satisfy these new interfaces, and adjust the `rootmulti` MultiStore's `GetKVStore` method -to enable wrapping the returned `KVStore` with the `listenkv.Store`. +We will modify all of the `Store` and `MultiStore` implementations to satisfy these new interfaces, and adjust the `rootmulti` `GetKVStore` method +to wrap the returned `KVStore` with a `listenkv.Store` if listening is turned on. ```go func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { @@ -252,7 +255,8 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { } ``` -We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to enable cache listening when `CacheListening` is turned on. +We will also adjust the `cachemulti` constructor methods and the `rootmulti` `CacheMultiStore` method to enable listening +in the cache layer when `CacheListening` is turned on. ```go func (rs *Store) CacheMultiStore() types.CacheMultiStore { @@ -260,7 +264,7 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { for k, v := range rs.stores { stores[k] = v } - var cacheListeners map[types.StoreKey][]types.Listener + var cacheListeners map[types.StoreKey][]types.Listening if rs.cacheListening { cacheListeners = rs.listeners } @@ -273,26 +277,44 @@ We will introduce and document mechanisms for exposing data from the above liste #### Writing to file We will document and provide examples of how to configure a listener to write out to a file. -No new type implementation is needed, a `os.File` can be used as the underlying `io.Writer` for a listener. +No new type implementation will be needed, a `os.File` can be used as the underlying `io.Writer` for a listener. + Writing to a file is the simplest approach for streaming the data out to consumers. -This approach also provide the advantages of being persistent and durable. -The files can be read directly or an auxiliary streaming services can tail the files and serve the data remotely. +This approach also provide the advantages of being persistent and durable, and the files can be read directly +or an auxiliary streaming services can tail the files and serve the data remotely. + Without pruning the file size can grow indefinitely, this will need to be managed by the developer in an application or even module-specific manner. #### Writing to gRPC stream -We will implement a `io.Writer` type for exposing our listeners over a gRPC server stream. -Writing to a gRPC stream gRPC allows us to expose the data over the standard gRPC interface. -This interface can be exposed directly to consumers or we can implement a message queue or streaming service logic on top. -Using gRPC provides us with all of the regular advantages of gRPC and protobuf: versioning guarantees, client side code generation, and interoperability with the many gRPC plugins and auxillary services. +We will implement and document an `io.Writer` type for exposing our listeners over a gRPC server stream. + +Writing to a gRPC stream gRPC will allow us to expose the data over the standard gRPC interface. +This interface can be exposed directly to consumers or we can implement a message queue or secondary streaming service on top. +Using gRPC will provide us with all of the regular advantages of gRPC and protobuf: versioning guarantees, client side code generation, and interoperability with the many gRPC plugins and auxillary services. + Proceeding through a gRPC intermediate will provide additional overhead, in most cases this is not expected to be rate limiting but in instances where it is the developer can implement a more performant streaming mechanism for state listening. ### Configuration -We will provide detailed documentation for how to configure the state listeners and their external streaming services from within an app's `AppCreator`, -using the provided `AppOptions`. +We will provide detailed documentation on how to configure the state listeners and their external streaming services from within an app's `AppCreator`, +using the provided `AppOptions`. We will add two methods to the `BaseApp` to enable this configuration: + +```go +// SetCommitMultiStoreListeners sets the KVStore listeners for the provided StoreKey +func (app *BaseApp) SetCommitMultiStoreListeners(key sdk.StoreKey, listeners []storeTypes.Listening) { + app.cms.SetListeners(key, listeners) +} + +// SetCacheListening turns on or off listening at the cache layer +func (app *BaseApp) SetCacheListening(listening bool) { + app.cms.CacheListening(listening) +} +``` + +As a demonstration, we will implement the state watching features as part of SimApp. +For example, the below is a very rudimentary integration of the state listening features into the SimApp `AppCreator` function: -e.g. SimApp with simple state streaming to files: ```go func NewSimApp( @@ -341,7 +363,7 @@ func loadListener(bApp *baseapp.BaseApp, writeDir string, key sdk.StoreKey) { if err != nil { tmos.Exit(err.Error()) } - // using single listener with all operations and keys permitted + // using single listener with all operations and keys permitted and no TraceContext listener := storeTypes.NewDefaultStateListener(fileHandler, nil) bApp.SetCommitMultiStoreListeners(key, []storeTypes.Listening{listener}) }