From 1d6adca58c14b64af9417a4de6b842fc59d942a5 Mon Sep 17 00:00:00 2001 From: yihuang Date: Mon, 3 Oct 2022 21:26:41 +0800 Subject: [PATCH] feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (#8664) (#13326) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (#8664) Hello 👋 this PR introduces the second stage of changes to support [ADR-038](https://github.com/cosmos/cosmos-sdk/pull/8012) state listening. This is rebased on top of the [first segment](https://github.com/cosmos/cosmos-sdk/pull/8551), which introduces the low level changes to the MultiStore and KVStore interfaces and implementations, the new WriteListener types, and the new listen.KVStore type. In this segment we introduce the StreamingService interface, an implementation that writes out to files, and it's integration and configuration at the BaseApp level. The idea was to have the first segment reviewed independently first but if people think it is easier/more time efficient to review both at the same time then we could start here. Thanks! This work is towards satisfying [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md) --- Before we can merge this PR, please make sure that all the following items have been checked off. If any of the checklist items are not applicable, please leave them but write a little note why. - [x] Targeted PR against correct branch (see [CONTRIBUTING.md](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting)) - [x] Linked to Github issue with discussion and accepted design OR link to spec that describes this work. - [x] Code follows the [module structure standards](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/structure.md). - [x] Wrote unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing) - [x] Updated relevant documentation (`docs/`) or specification (`x//spec/`) - [x] Added relevant `godoc` [comments](https://blog.golang.org/godoc-documenting-go-code). - [x] Added a relevant changelog entry to the `Unreleased` section in `CHANGELOG.md` - [x] Re-reviewed `Files changed` in the Github PR explorer - [x] Review `Codecov Report` in the comment section below once CI passes * fix lint * Update CHANGELOG.md Co-authored-by: Ian Norden --- CHANGELOG.md | 4 + baseapp/abci.go | 25 +- baseapp/baseapp.go | 17 +- baseapp/options.go | 11 + baseapp/streaming.go | 217 ++-------- docs/architecture/adr-038-state-listening.md | 334 +++++++++------- simapp/app.go | 11 +- store/cachemulti/store.go | 21 +- store/streaming/README.md | 73 +++- store/streaming/constructor.go | 137 +++++++ store/streaming/constructor_test.go | 43 ++ store/streaming/file/README.md | 64 +++ store/streaming/file/example_config.toml | 10 + store/streaming/file/service.go | 278 +++++++++++++ store/streaming/file/service_test.go | 400 +++++++++++++++++++ 15 files changed, 1253 insertions(+), 392 deletions(-) create mode 100644 store/streaming/constructor.go create mode 100644 store/streaming/constructor_test.go create mode 100644 store/streaming/file/README.md create mode 100644 store/streaming/file/example_config.toml create mode 100644 store/streaming/file/service.go create mode 100644 store/streaming/file/service_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 39606a8eae91..019d04314d7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,10 @@ Ref: https://keepachangelog.com/en/1.0.0/ - (cli) [#13089](https://github.com/cosmos/cosmos-sdk/pull/13089) Fix rollback command don't actually delete multistore versions, added method `RollbackToVersion` to interface `CommitMultiStore` and added method `CommitMultiStore` to `Application` interface. +### Improvements + +* (store) [\#13326](https://github.com/cosmos/cosmos-sdk/pull/13326) Implementation of ADR-038 file StreamingService, backport #8664 + ## v0.45.8 - 2022-08-25 ### Improvements diff --git a/baseapp/abci.go b/baseapp/abci.go index 7dde25cd6d1a..834dfb940fe7 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -242,6 +242,14 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg } // set the signed validators for addition to context in deliverTx app.voteInfos = req.LastCommitInfo.GetVotes() + + // call the hooks with the BeginBlock messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err) + } + } + return res } @@ -264,7 +272,12 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc res.ConsensusParamUpdates = cp } - app.deliverState.eventHistory = []abci.Event{} + // call the streaming service hooks with the EndBlock messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err) + } + } return res } @@ -310,9 +323,17 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { // Otherwise, the ResponseDeliverTx will contain releveant error information. // Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant // gas execution context. -func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { +func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) { defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx") + defer func() { + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("DeliverTx listening hook failed", "err", err) + } + } + }() + gInfo := sdk.GasInfo{} resultStr := "successful" diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 85b77f996050..e77209242e55 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -167,20 +167,9 @@ type BaseApp struct { //nolint: maligned // which informs CometBFT what to index. If empty, all events will be indexed. indexEvents map[string]struct{} - // streamingManager for managing instances and configuration of ABCIListener services - streamingManager storetypes.StreamingManager - - chainID string - - cdc codec.Codec - - // optimisticExec contains the context required for Optimistic Execution, - // including the goroutine handling.This is experimental and must be enabled - // by developers. - optimisticExec *oe.OptimisticExecution - - // includeNestedMsgsGas holds a set of message types for which gas costs for its nested messages are calculated. - includeNestedMsgsGas map[string]struct{} + // abciListeners for hooking into the ABCI message processing of the BaseApp + // and exposing the requests and responses to external consumers + abciListeners []ABCIListener } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a diff --git a/baseapp/options.go b/baseapp/options.go index 10e4f3de9232..a828c64a593f 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -397,3 +397,14 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) { func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) { app.grpcQueryRouter = grpcQueryRouter } + +// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore +func (app *BaseApp) SetStreamingService(s StreamingService) { + // add the listeners for each StoreKey + for key, lis := range s.Listeners() { + app.cms.AddListeners(key, lis) + } + // register the StreamingService within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context + app.abciListeners = append(app.abciListeners, s) +} diff --git a/baseapp/streaming.go b/baseapp/streaming.go index 6eeb8e37b449..39e0f1ca6e9b 100644 --- a/baseapp/streaming.go +++ b/baseapp/streaming.go @@ -1,200 +1,33 @@ package baseapp import ( - "context" - "fmt" - "sort" - "strings" + "io" + "sync" - abci "github.com/cometbft/cometbft/api/cometbft/abci/v1" - "github.com/spf13/cast" + abci "github.com/tendermint/tendermint/abci/types" - "cosmossdk.io/log" - "cosmossdk.io/schema" - "cosmossdk.io/schema/appdata" - "cosmossdk.io/schema/decoding" - "cosmossdk.io/schema/indexer" - "cosmossdk.io/store/streaming" - storetypes "cosmossdk.io/store/types" - - "github.com/cosmos/cosmos-sdk/client/flags" - servertypes "github.com/cosmos/cosmos-sdk/server/types" -) - -const ( - StreamingTomlKey = "streaming" - StreamingABCITomlKey = "abci" - StreamingABCIPluginTomlKey = "plugin" - StreamingABCIKeysTomlKey = "keys" - StreamingABCIStopNodeOnErrTomlKey = "stop-node-on-err" + store "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/cosmos-sdk/types" ) -// EnableIndexer enables the built-in indexer with the provided options (usually from the app.toml indexer key), -// kv-store keys, and app modules. Using the built-in indexer framework is mutually exclusive from using other -// types of streaming listeners. -func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*storetypes.KVStoreKey, appModules map[string]any) error { - listener, err := indexer.StartManager(indexer.ManagerOptions{ - Config: indexerOpts, - Resolver: decoding.ModuleSetDecoderResolver(appModules), - SyncSource: nil, - Logger: app.logger.With(log.ModuleKey, "indexer"), - }) - if err != nil { - return err - } - - exposedKeys := exposeStoreKeysSorted([]string{"*"}, keys) - app.cms.AddListeners(exposedKeys) - - app.streamingManager = storetypes.StreamingManager{ - ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener}}, - StopNodeOnErr: true, - } - - return nil -} - -// RegisterStreamingServices registers streaming services with the BaseApp. -func (app *BaseApp) RegisterStreamingServices(appOpts servertypes.AppOptions, keys map[string]*storetypes.KVStoreKey) error { - // register streaming services - streamingCfg := cast.ToStringMap(appOpts.Get(StreamingTomlKey)) - for service := range streamingCfg { - pluginKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, service, StreamingABCIPluginTomlKey) - pluginName := strings.TrimSpace(cast.ToString(appOpts.Get(pluginKey))) - if len(pluginName) > 0 { - logLevel := cast.ToString(appOpts.Get(flags.FlagLogLevel)) - plugin, err := streaming.NewStreamingPlugin(pluginName, logLevel) - if err != nil { - return fmt.Errorf("failed to load streaming plugin: %w", err) - } - if err := app.registerStreamingPlugin(appOpts, keys, plugin); err != nil { - return fmt.Errorf("failed to register streaming plugin %w", err) - } - } - } - - return nil -} - -// registerStreamingPlugin registers streaming plugins with the BaseApp. -func (app *BaseApp) registerStreamingPlugin( - appOpts servertypes.AppOptions, - keys map[string]*storetypes.KVStoreKey, - streamingPlugin interface{}, -) error { - v, ok := streamingPlugin.(storetypes.ABCIListener) - if !ok { - return fmt.Errorf("unexpected plugin type %T", v) - } - - app.registerABCIListenerPlugin(appOpts, keys, v) - return nil -} - -// registerABCIListenerPlugin registers plugins that implement the ABCIListener interface. -func (app *BaseApp) registerABCIListenerPlugin( - appOpts servertypes.AppOptions, - keys map[string]*storetypes.KVStoreKey, - abciListener storetypes.ABCIListener, -) { - stopNodeOnErrKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIStopNodeOnErrTomlKey) - stopNodeOnErr := cast.ToBool(appOpts.Get(stopNodeOnErrKey)) - keysKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIKeysTomlKey) - exposeKeysStr := cast.ToStringSlice(appOpts.Get(keysKey)) - exposedKeys := exposeStoreKeysSorted(exposeKeysStr, keys) - app.cms.AddListeners(exposedKeys) - app.SetStreamingManager( - storetypes.StreamingManager{ - ABCIListeners: []storetypes.ABCIListener{abciListener}, - StopNodeOnErr: stopNodeOnErr, - }, - ) -} - -func exposeAll(list []string) bool { - for _, ele := range list { - if ele == "*" { - return true - } - } - return false -} - -func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStoreKey) []storetypes.StoreKey { - var exposeStoreKeys []storetypes.StoreKey - if exposeAll(keysStr) { - exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keys)) - for key := range keys { - exposeStoreKeys = append(exposeStoreKeys, keys[key]) - } - } else { - exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keysStr)) - for _, keyStr := range keysStr { - if storeKey, ok := keys[keyStr]; ok { - exposeStoreKeys = append(exposeStoreKeys, storeKey) - } - } - } - // sort storeKeys for deterministic output - sort.SliceStable(exposeStoreKeys, func(i, j int) bool { - return exposeStoreKeys[i].Name() < exposeStoreKeys[j].Name() - }) - - return exposeStoreKeys -} - -type listenerWrapper struct { - listener appdata.Listener -} - -func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error { - if p.listener.StartBlock != nil { - err := p.listener.StartBlock(appdata.StartBlockData{ - Height: uint64(req.Height), - }) - if err != nil { - return err - } - } - - //// TODO txs, events - - return nil -} - -func (p listenerWrapper) ListenCommit(ctx context.Context, res abci.CommitResponse, changeSet []*storetypes.StoreKVPair) error { - if cb := p.listener.OnKVPair; cb != nil { - updates := make([]appdata.ActorKVPairUpdate, len(changeSet)) - for i, pair := range changeSet { - updates[i] = appdata.ActorKVPairUpdate{ - Actor: []byte(pair.StoreKey), - StateChanges: []schema.KVPairUpdate{ - { - Key: pair.Key, - Value: pair.Value, - Remove: pair.Delete, - }, - }, - } - } - err := cb(appdata.KVPairData{Updates: updates}) - if err != nil { - return err - } - } - - if p.listener.Commit != nil { - commitCb, err := p.listener.Commit(appdata.CommitData{}) - if err != nil { - return err - } - if commitCb != nil { - err := commitCb() - if err != nil { - return err - } - } - } - - return nil +// ABCIListener interface used to hook into the ABCI message processing of the BaseApp +type ABCIListener interface { + // ListenBeginBlock updates the streaming service with the latest BeginBlock messages + ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error + // ListenEndBlock updates the steaming service with the latest EndBlock messages + ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error + // ListenDeliverTx updates the steaming service with the latest DeliverTx messages + ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error +} + +// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks +type StreamingService interface { + // Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file + Stream(wg *sync.WaitGroup) error + // Listeners returns the streaming service's listeners for the BaseApp to register + Listeners() map[store.StoreKey][]store.WriteListener + // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp + ABCIListener + // Closer interface + io.Closer } diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index db0620c55441..3c360ede4751 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -35,9 +35,12 @@ In a new file, `store/types/listening.go`, we will create a `MemoryListener` str The `MemoryListener` will be used internally by the concrete `rootmulti` implementation to collect state changes from KVStores. ```go -// MemoryListener listens to the state writes and accumulate the records in memory. -type MemoryListener struct { - stateCache []StoreKVPair +// WriteListener interface for streaming data out from a listenkv.Store +type WriteListener interface { + // if value is nil then it was deleted + // storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores + // delete bool indicates if it was a delete; true: delete, false: set + OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error } // NewMemoryListener creates a listener that accumulate the state writes in memory. @@ -213,20 +216,31 @@ func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStor ### Exposing the data -#### Streaming Service - -We will introduce a new `ABCIListener` interface that plugs into the BaseApp and relays ABCI requests and responses -so that the service can group the state changes with the ABCI requests. +We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers. +In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs into the BaseApp +and relays ABCI requests and responses so that the service can group the state changes with the ABCI requests that affected them and the ABCI responses they affected. ```go -// baseapp/streaming.go - -// ABCIListener is the interface that we're exposing as a streaming service. +// ABCIListener interface used to hook into the ABCI message processing of the BaseApp type ABCIListener interface { - // ListenFinalizeBlock updates the streaming service with the latest FinalizeBlock messages - ListenFinalizeBlock(ctx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error - // ListenCommit updates the steaming service with the latest Commit messages and state changes - ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*StoreKVPair) error + // ListenBeginBlock updates the streaming service with the latest BeginBlock messages + ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error + // ListenEndBlock updates the steaming service with the latest EndBlock messages + ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error + // ListenDeliverTx updates the steaming service with the latest DeliverTx messages + ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error +} + +// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks +type StreamingService interface { + // Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file + Stream(wg *sync.WaitGroup) error + // Listeners returns the streaming service's listeners for the BaseApp to register + Listeners() map[types.StoreKey][]store.WriteListener + // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp + ABCIListener + // Closer interface + io.Closer } ``` @@ -234,16 +248,45 @@ type ABCIListener interface { We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s: - ```go - // SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore -func (app *BaseApp) SetStreamingService(s ABCIListener) { - // register the StreamingService within the BaseApp - // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context - app.abciListeners = append(app.abciListeners, s) -} -``` +Writing to a file is the simplest approach for streaming the data out to consumers. +This approach also provides the advantages of being persistent and durable, and the files can be read directly, +or an auxiliary streaming services can read from the files and serve the data over a remote interface. + +##### Encoding + +For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number. +At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written. +At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written. +In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M +is the tx number in the block (i.e. 0, 1, 2...). +At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written. +At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written. +In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. -We will add two new fields to the `BaseApp` struct: +For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number. +At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written. +At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written. +In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +##### Decoding + +To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto +messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request, +the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into +the appropriate message type. + +The type of ABCI req/res, the block height, and the transaction index (where relevant) is known +from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message. + +##### Implementation example ```go type BaseApp struct { @@ -364,19 +407,7 @@ type ABCIListenerPlugin struct { Impl baseapp.ABCIListener } -func (p *ListenerGRPCPlugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error { - RegisterABCIListenerServiceServer(s, &GRPCServer{Impl: p.Impl}) - return nil -} - -func (p *ListenerGRPCPlugin) GRPCClient( - _ context.Context, - _ *plugin.GRPCBroker, - c *grpc.ClientConn, -) (interface{}, error) { - return &GRPCClient{client: NewABCIListenerServiceClient(c)}, nil -} -``` +#### Auxiliary streaming service The `plugin.Plugin` interface has two methods `Client` and `Server`. For our GRPC service these are `GRPCClient` and `GRPCServer` The `Impl` field holds the concrete implementation of our `baseapp.ABCIListener` interface written in Go. @@ -410,12 +441,16 @@ service ABCIListenerService { } ``` -```protobuf -... -// plugin that doesn't listen to state changes -service ABCIListenerService { - rpc ListenFinalizeBlock(ListenFinalizeBlockRequest) returns (Empty); - rpc ListenCommit(ListenCommitRequest) returns (Empty); +```go +// SetStreamingService is used to register a streaming service with the BaseApp +func (app *BaseApp) SetStreamingService(s StreamingService) { + // set the listeners for each StoreKey + for key, lis := range s.Listeners() { + app.cms.AddListeners(key, lis) + } + // register the streaming service hooks within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks + app.hooks = append(app.hooks, s) } ``` @@ -486,128 +521,80 @@ func main() { "grpc_plugin_v1": &grpc_abci_v1.ABCIListenerGRPCPlugin{Impl: &ABCIListenerPlugin{}}, }, - // A non-nil value here enables gRPC serving for this streaming... - GRPCServer: plugin.DefaultGRPCServer, - }) -} +```toml +[store] + streamers = [ # if len(streamers) > 0 we are streaming + "file", + ] + +[streamers] + [streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" ``` We will introduce a plugin loading system that will return `(interface{}, error)`. This provides the advantage of using versioned plugins where the plugin interface and gRPC protocol change over time. In addition, it allows for building independent plugin that can expose different parts of the system over gRPC. -```go -func NewStreamingPlugin(name string, logLevel string) (interface{}, error) { - logger := hclog.New(&hclog.LoggerOptions{ - Output: hclog.DefaultOutput, - Level: toHclogLevel(logLevel), - Name: fmt.Sprintf("plugin.%s", name), - }) - - // We're a host. Start by launching the streaming process. - env := os.Getenv(GetPluginEnvKey(name)) - client := plugin.NewClient(&plugin.ClientConfig{ - HandshakeConfig: HandshakeMap[name], - Plugins: PluginMap, - Cmd: exec.Command("sh", "-c", env), - Logger: logger, - AllowedProtocols: []plugin.Protocol{ - plugin.ProtocolNetRPC, plugin.ProtocolGRPC}, - }) - - // Connect via RPC - rpcClient, err := client.Client() - if err != nil { - return nil, err - } - - // Request streaming plugin - return rpcClient.Dispense(name) -} - -``` - -We propose a `RegisterStreamingPlugin` function for the App to register `NewStreamingPlugin`s with the App's BaseApp. -Streaming plugins can be of `Any` type; therefore, the function takes in an interface vs a concrete type. -For example, we could have plugins of `ABCIListener`, `WasmListener` or `IBCListener`. Note that `RegisterStreamingPluing` function -is helper function and not a requirement. Plugin registration can easily be moved from the App to the BaseApp directly. +Each configured streamer will receive the ```go -// baseapp/streaming.go +// ServiceConstructor is used to construct a streaming service +type ServiceConstructor func(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) -// RegisterStreamingPlugin registers streaming plugins with the App. -// This method returns an error if a plugin is not supported. -func RegisterStreamingPlugin( - bApp *BaseApp, - appOpts servertypes.AppOptions, - keys map[string]*types.KVStoreKey, - streamingPlugin interface{}, -) error { - switch t := streamingPlugin.(type) { - case ABCIListener: - registerABCIListenerPlugin(bApp, appOpts, keys, t) - default: - return fmt.Errorf("unexpected plugin type %T", t) - } - return nil -} -``` +// ServiceType enum for specifying the type of StreamingService +type ServiceType int -```go -func registerABCIListenerPlugin( - bApp *BaseApp, - appOpts servertypes.AppOptions, - keys map[string]*store.KVStoreKey, - abciListener ABCIListener, -) { - asyncKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIAsync) - async := cast.ToBool(appOpts.Get(asyncKey)) - stopNodeOnErrKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIStopNodeOnErrTomlKey) - stopNodeOnErr := cast.ToBool(appOpts.Get(stopNodeOnErrKey)) - keysKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIKeysTomlKey) - exposeKeysStr := cast.ToStringSlice(appOpts.Get(keysKey)) - exposedKeys := exposeStoreKeysSorted(exposeKeysStr, keys) - bApp.cms.AddListeners(exposedKeys) - app.SetStreamingManager( - storetypes.StreamingManager{ - ABCIListeners: []storetypes.ABCIListener{abciListener}, - StopNodeOnErr: stopNodeOnErr, - }, - ) -} -``` - -```go -func exposeAll(list []string) bool { - for _, ele := range list { - if ele == "*" { - return true - } - } - return false -} - -func exposeStoreKeys(keysStr []string, keys map[string]*types.KVStoreKey) []types.StoreKey { - var exposeStoreKeys []types.StoreKey - if exposeAll(keysStr) { - exposeStoreKeys = make([]types.StoreKey, 0, len(keys)) - for _, storeKey := range keys { - exposeStoreKeys = append(exposeStoreKeys, storeKey) - } - } else { - exposeStoreKeys = make([]types.StoreKey, 0, len(keysStr)) - for _, keyStr := range keysStr { - if storeKey, ok := keys[keyStr]; ok { - exposeStoreKeys = append(exposeStoreKeys, storeKey) - } - } - } - // sort storeKeys for deterministic output - sort.SliceStable(exposeStoreKeys, func(i, j int) bool { - return exposeStoreKeys[i].Name() < exposeStoreKeys[j].Name() - }) +const ( + Unknown ServiceType = iota + File + // add more in the future +) - return exposeStoreKeys +// NewStreamingServiceType returns the streaming.ServiceType corresponding to the provided name +func NewStreamingServiceType(name string) ServiceType { + switch strings.ToLower(name) { + case "file", "f": + return File + default: + return Unknown + } +} + +// String returns the string name of a streaming.ServiceType +func (sst ServiceType) String() string { + switch sst { + case File: + return "file" + default: + return "" + } +} + +// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors +var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{ + File: NewFileStreamingService, +} + +// ServiceTypeFromString returns the streaming.ServiceConstructor corresponding to the provided name +func ServiceTypeFromString(name string) (ServiceConstructor, error) { + ssType := NewStreamingServiceType(name) + if ssType == Unknown { + return nil, fmt.Errorf("unrecognized streaming service name %s", name) + } + if constructor, ok := ServiceConstructorLookupTable[ssType]; ok { + return constructor, nil + } + return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) +} + +// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService +func NewFileStreamingService(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) { + filePrefix := cast.ToString(opts.Get("streamers.file.prefix")) + fileDir := cast.ToString(opts.Get("streamers.file.write_dir")) + return file.NewStreamingService(fileDir, filePrefix, keys, marshaller) } ``` @@ -656,7 +643,46 @@ func NewSimApp( return app ``` -#### Configuration + // configure state listening capabilities using AppOptions + listeners := cast.ToStringSlice(appOpts.Get("store.streamers")) + for _, listenerName := range listeners { + // get the store keys allowed to be exposed for this streaming service + exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName))) + var exposeStoreKeys []sdk.StoreKey + if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys + exposeStoreKeys = make([]sdk.StoreKey, 0, len(keys)) + for _, storeKey := range keys { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } else { + exposeStoreKeys = make([]sdk.StoreKey, 0, len(exposeKeyStrs)) + for _, keyStr := range exposeKeyStrs { + if storeKey, ok := keys[keyStr]; ok { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } + } + if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything + continue + } + // get the constructor for this listener name + constructor, err := baseapp.NewStreamingServiceConstructor(listenerName) + if err != nil { + tmos.Exit(err.Error()) // or continue? + } + // generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose + streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) + if err != nil { + tmos.Exit(err.Error()) + } + // register the streaming service with the BaseApp + bApp.RegisterStreamingService(streamingService) + // waitgroup and quit channel for optional shutdown coordination of the streaming service + wg := new(sync.WaitGroup) + quitChan := make(chan struct{})) + // kick off the background streaming service loop + streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead? + } The plugin system will be configured within an App's TOML configuration files. diff --git a/simapp/app.go b/simapp/app.go index c6ec1ab13e68..4a2915022d46 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -34,8 +34,9 @@ import ( "github.com/cosmos/cosmos-sdk/server/api" "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" - "github.com/cosmos/cosmos-sdk/std" - testdata_pulsar "github.com/cosmos/cosmos-sdk/testutil/testdata/testpb" + simappparams "github.com/cosmos/cosmos-sdk/simapp/params" + "github.com/cosmos/cosmos-sdk/store/streaming" + "github.com/cosmos/cosmos-sdk/testutil/testdata" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/module" "github.com/cosmos/cosmos-sdk/types/msgservice" @@ -225,6 +226,12 @@ func NewSimApp( // not include this key. memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey") + // configure state listening capabilities using AppOptions + // we are doing nothing with the returned streamingServices and waitGroup in this case + if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil { + tmos.Exit(err.Error()) + } + app := &SimApp{ BaseApp: bApp, logger: logger, diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index e80d1fa4bb37..f60fcdcc9316 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -4,11 +4,13 @@ import ( "fmt" "io" - corestore "cosmossdk.io/core/store" - "cosmossdk.io/store/cachekv" - "cosmossdk.io/store/dbadapter" - "cosmossdk.io/store/tracekv" - "cosmossdk.io/store/types" + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/listenkv" + "github.com/cosmos/cosmos-sdk/store/tracekv" + "github.com/cosmos/cosmos-sdk/store/types" ) // storeNameCtxKey is the TraceContext metadata key that identifies @@ -50,11 +52,10 @@ func NewFromKVStore( for key, store := range stores { if cms.TracingEnabled() { - tctx := cms.traceContext.Clone().Merge(types.TraceContext{ - storeNameCtxKey: key.Name(), - }) - - store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx) + store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, cms.traceContext) + } + if cms.ListeningEnabled(key) { + store = listenkv.NewStore(store.(types.KVStore), key, listeners[key]) } cms.stores[key] = cachekv.NewStore(store.(types.KVStore)) } diff --git a/store/streaming/README.md b/store/streaming/README.md index faa304dec091..819514aef796 100644 --- a/store/streaming/README.md +++ b/store/streaming/README.md @@ -1,30 +1,67 @@ -# Cosmos-SDK Plugins +# State Streaming Service +This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a +file or stream, as described in [ADR-038](../../docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](../../baseapp/streaming.go). +The child directories contain the implementations for specific output destinations. -This package contains an extensible plugin system for the Cosmos-SDK. The plugin system leverages the [hashicorp/go-plugin](https://github.com/hashicorp/go-plugin) system. This system is designed to work over RPC. +Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional +output destinations can be added. -Although the `go-plugin` is built to work over RPC, it is currently only designed to work over a local network. +The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file: -## Pre requisites +```toml +[store] + streamers = [ # if len(streamers) > 0 we are streaming + "file", # name of the streaming service, used by constructor + ] -For an overview of supported features by the `go-plugin` system, please see https://github.com/hashicorp/go-plugin. The `go-plugin` documentation is located [here](https://github.com/hashicorp/go-plugin/tree/master/docs). You can also directly visit any of the links below: +[streamers] + [streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" +``` -* [Writing plugins without Go](https://github.com/hashicorp/go-plugin/blob/master/docs/guide-plugin-write-non-go.md) -* [Go Plugin Tutorial](https://github.com/hashicorp/go-plugin/blob/master/docs/extensive-go-plugin-tutorial.md) -* [Plugin Internals](https://github.com/hashicorp/go-plugin/blob/master/docs/internals.md) -* [Plugin Architecture](https://www.youtube.com/watch?v=SRvm3zQQc1Q) (start here) +`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString` +to return the `ServiceConstructor` for that particular implementation: -## Exposing plugins + +```go +listeners := cast.ToStringSlice(appOpts.Get("store.streamers")) +for _, listenerName := range listeners { + constructor, err := ServiceTypeFromString(listenerName) + if err != nil { + // handle error + } +} +``` -To expose plugins to the plugin system, you will need to: +`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service. +`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`. +In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off. -1. Implement the gRPC message protocol service of the plugin -2. Build the plugin binary -3. Export it +Additional configuration parameters are optional and specific to the implementation. +In the case of the file streaming service, `streamers.file.write_dir` contains the path to the +directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions +with other App `StreamingService` output files. -Read the plugin documentation in the [Streaming Plugins](#streaming-plugins) section for examples on how to build a plugin. +The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and +returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options, +e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`. -## Streaming Plugins +```go +streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) +if err != nil { + // handler error +} +``` -List of support streaming plugins +The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method. +The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process +may be synchronous or asynchronous with the message processing of the state machine. -* [ABCI State Streaming Plugin](abci/README.md) +```go +bApp.SetStreamingService(streamingService) +wg := new(sync.WaitGroup) +quitChan := make(chan struct{}) +streamingService.Stream(wg, quitChan) +``` diff --git a/store/streaming/constructor.go b/store/streaming/constructor.go new file mode 100644 index 000000000000..e576f84b83d1 --- /dev/null +++ b/store/streaming/constructor.go @@ -0,0 +1,137 @@ +package streaming + +import ( + "fmt" + "strings" + "sync" + + "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/codec" + serverTypes "github.com/cosmos/cosmos-sdk/server/types" + "github.com/cosmos/cosmos-sdk/store/streaming/file" + "github.com/cosmos/cosmos-sdk/store/types" + + "github.com/spf13/cast" +) + +// ServiceConstructor is used to construct a streaming service +type ServiceConstructor func(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error) + +// ServiceType enum for specifying the type of StreamingService +type ServiceType int + +const ( + Unknown ServiceType = iota + File + // add more in the future +) + +// ServiceTypeFromString returns the streaming.ServiceType corresponding to the provided name +func ServiceTypeFromString(name string) ServiceType { + switch strings.ToLower(name) { + case "file", "f": + return File + default: + return Unknown + } +} + +// String returns the string name of a streaming.ServiceType +func (sst ServiceType) String() string { + switch sst { + case File: + return "file" + default: + return "unknown" + } +} + +// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors +var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{ + File: NewFileStreamingService, +} + +// NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name +func NewServiceConstructor(name string) (ServiceConstructor, error) { + ssType := ServiceTypeFromString(name) + if ssType == Unknown { + return nil, fmt.Errorf("unrecognized streaming service name %s", name) + } + if constructor, ok := ServiceConstructorLookupTable[ssType]; ok && constructor != nil { + return constructor, nil + } + return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) +} + +// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService +func NewFileStreamingService(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error) { + filePrefix := cast.ToString(opts.Get("streamers.file.prefix")) + fileDir := cast.ToString(opts.Get("streamers.file.write_dir")) + return file.NewStreamingService(fileDir, filePrefix, keys, marshaller) +} + +// LoadStreamingServices is a function for loading StreamingServices onto the BaseApp using the provided AppOptions, codec, and keys +// It returns the WaitGroup and quit channel used to synchronize with the streaming services and any error that occurs during the setup +func LoadStreamingServices(bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions, appCodec codec.BinaryCodec, keys map[string]*types.KVStoreKey) ([]baseapp.StreamingService, *sync.WaitGroup, error) { + // waitgroup and quit channel for optional shutdown coordination of the streaming service(s) + wg := new(sync.WaitGroup) + // configure state listening capabilities using AppOptions + streamers := cast.ToStringSlice(appOpts.Get("store.streamers")) + activeStreamers := make([]baseapp.StreamingService, 0, len(streamers)) + for _, streamerName := range streamers { + // get the store keys allowed to be exposed for this streaming service + exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName))) + var exposeStoreKeys []types.StoreKey + if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys + exposeStoreKeys = make([]types.StoreKey, 0, len(keys)) + for _, storeKey := range keys { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } else { + exposeStoreKeys = make([]types.StoreKey, 0, len(exposeKeyStrs)) + for _, keyStr := range exposeKeyStrs { + if storeKey, ok := keys[keyStr]; ok { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } + } + if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything + continue + } + // get the constructor for this streamer name + constructor, err := NewServiceConstructor(streamerName) + if err != nil { + // close any services we may have already spun up before hitting the error on this one + for _, activeStreamer := range activeStreamers { + activeStreamer.Close() + } + return nil, nil, err + } + // generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose + streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) + if err != nil { + // close any services we may have already spun up before hitting the error on this one + for _, activeStreamer := range activeStreamers { + activeStreamer.Close() + } + return nil, nil, err + } + // register the streaming service with the BaseApp + bApp.SetStreamingService(streamingService) + // kick off the background streaming service loop + streamingService.Stream(wg) + // add to the list of active streamers + activeStreamers = append(activeStreamers, streamingService) + } + // if there are no active streamers, activeStreamers is empty (len == 0) and the waitGroup is not waiting on anything + return activeStreamers, wg, nil +} + +func exposeAll(list []string) bool { + for _, ele := range list { + if ele == "*" { + return true + } + } + return false +} diff --git a/store/streaming/constructor_test.go b/store/streaming/constructor_test.go new file mode 100644 index 000000000000..5f9d58016f68 --- /dev/null +++ b/store/streaming/constructor_test.go @@ -0,0 +1,43 @@ +package streaming + +import ( + "testing" + + "github.com/cosmos/cosmos-sdk/codec" + codecTypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/store/streaming/file" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/stretchr/testify/require" +) + +type fakeOptions struct{} + +func (f *fakeOptions) Get(string) interface{} { return nil } + +var ( + mockOptions = new(fakeOptions) + mockKeys = []types.StoreKey{sdk.NewKVStoreKey("mockKey1"), sdk.NewKVStoreKey("mockKey2")} + interfaceRegistry = codecTypes.NewInterfaceRegistry() + testMarshaller = codec.NewProtoCodec(interfaceRegistry) +) + +func TestStreamingServiceConstructor(t *testing.T) { + _, err := NewServiceConstructor("unexpectedName") + require.NotNil(t, err) + + constructor, err := NewServiceConstructor("file") + require.Nil(t, err) + var expectedType ServiceConstructor + require.IsType(t, expectedType, constructor) + + serv, err := constructor(mockOptions, mockKeys, testMarshaller) + require.Nil(t, err) + require.IsType(t, &file.StreamingService{}, serv) + listeners := serv.Listeners() + for _, key := range mockKeys { + _, ok := listeners[key] + require.True(t, ok) + } +} diff --git a/store/streaming/file/README.md b/store/streaming/file/README.md new file mode 100644 index 000000000000..7243f15dd0bf --- /dev/null +++ b/store/streaming/file/README.md @@ -0,0 +1,64 @@ +# File Streaming Service +This pkg contains an implementation of the [StreamingService](../../../baseapp/streaming.go) that writes +the data stream out to files on the local filesystem. This process is performed synchronously with the message processing +of the state machine. + +## Configuration + +The `file.StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file: + +```toml +[store] + streamers = [ # if len(streamers) > 0 we are streaming + "file", # name of the streaming service, used by constructor + ] + +[streamers] + [streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" +``` + +We turn the service on by adding its name, "file", to `store.streamers`- the list of streaming services for this App to employ. + +In `streamers.file` we include three configuration parameters for the file streaming service: +1. `streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service. +In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off. +2. `streamers.file.write_dir` contains the path to the directory to write the files to. +3. `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions +with other App `StreamingService` output files. + +##### Encoding + +For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number. +At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written. +At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written. +In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M +is the tx number in the block (i.e. 0, 1, 2...). +At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written. +At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written. +In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number. +At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written. +At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written. +In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +##### Decoding + +To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto +messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request, +the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into +the appropriate message type. + +The type of ABCI req/res, the block height, and the transaction index (where relevant) is known +from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message. diff --git a/store/streaming/file/example_config.toml b/store/streaming/file/example_config.toml new file mode 100644 index 000000000000..8202bd8ef559 --- /dev/null +++ b/store/streaming/file/example_config.toml @@ -0,0 +1,10 @@ +[store] + streamers = [ # if len(streamers) > 0 we are streaming + "file", # name of the streaming service, used by constructor + ] + +[streamers] + [streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go new file mode 100644 index 000000000000..685ba7d9d770 --- /dev/null +++ b/store/streaming/file/service.go @@ -0,0 +1,278 @@ +package file + +import ( + "errors" + "fmt" + "os" + "path" + "path/filepath" + "sync" + + abci "github.com/tendermint/tendermint/abci/types" + + "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ baseapp.StreamingService = &StreamingService{} + +// StreamingService is a concrete implementation of StreamingService that writes state changes out to files +type StreamingService struct { + listeners map[types.StoreKey][]types.WriteListener // the listeners that will be initialized with BaseApp + srcChan <-chan []byte // the channel that all the WriteListeners write their data out to + filePrefix string // optional prefix for each of the generated files + writeDir string // directory to write files into + codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files + stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received + stateCacheLock *sync.Mutex // mutex for the state cache + currentBlockNumber int64 // the current block number + currentTxIndex int64 // the index of the current tx + quitChan chan struct{} // channel to synchronize closure +} + +// IntermediateWriter is used so that we do not need to update the underlying io.Writer +// inside the StoreKVPairWriteListener everytime we begin writing to a new file +type IntermediateWriter struct { + outChan chan<- []byte +} + +// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel +func NewIntermediateWriter(outChan chan<- []byte) *IntermediateWriter { + return &IntermediateWriter{ + outChan: outChan, + } +} + +// Write satisfies io.Writer +func (iw *IntermediateWriter) Write(b []byte) (int, error) { + iw.outChan <- b + return len(b), nil +} + +// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys +func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec) (*StreamingService, error) { + listenChan := make(chan []byte) + iw := NewIntermediateWriter(listenChan) + listener := types.NewStoreKVPairWriteListener(iw, c) + listeners := make(map[types.StoreKey][]types.WriteListener, len(storeKeys)) + // in this case, we are using the same listener for each Store + for _, key := range storeKeys { + listeners[key] = append(listeners[key], listener) + } + // check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not + // we don't open a dstFile until we receive our first ABCI message + if err := isDirWriteable(writeDir); err != nil { + return nil, err + } + return &StreamingService{ + listeners: listeners, + srcChan: listenChan, + filePrefix: filePrefix, + writeDir: writeDir, + codec: c, + stateCache: make([][]byte, 0), + stateCacheLock: new(sync.Mutex), + }, nil +} + +// Listeners satisfies the baseapp.StreamingService interface +// It returns the StreamingService's underlying WriteListeners +// Use for registering the underlying WriteListeners with the BaseApp +func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener { + return fss.listeners +} + +// ListenBeginBlock satisfies the baseapp.ABCIListener interface +// It writes the received BeginBlock request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { + // generate the new file + dstFile, err := fss.openBeginBlockFile(req) + if err != nil { + return err + } + // write req to file + lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { + return err + } + // write all state changes cached for this stage to file + fss.stateCacheLock.Lock() + for _, stateChange := range fss.stateCache { + if _, err = dstFile.Write(stateChange); err != nil { + fss.stateCache = nil + fss.stateCacheLock.Unlock() + return err + } + } + // reset cache + fss.stateCache = nil + fss.stateCacheLock.Unlock() + // write res to file + lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { + return err + } + // close file + return dstFile.Close() +} + +func (fss *StreamingService) openBeginBlockFile(req abci.RequestBeginBlock) (*os.File, error) { + fss.currentBlockNumber = req.GetHeader().Height + fss.currentTxIndex = 0 + fileName := fmt.Sprintf("block-%d-begin", fss.currentBlockNumber) + if fss.filePrefix != "" { + fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) + } + return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0o600) +} + +// ListenDeliverTx satisfies the baseapp.ABCIListener interface +// It writes the received DeliverTx request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { + // generate the new file + dstFile, err := fss.openDeliverTxFile() + if err != nil { + return err + } + // write req to file + lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { + return err + } + // write all state changes cached for this stage to file + fss.stateCacheLock.Lock() + for _, stateChange := range fss.stateCache { + if _, err = dstFile.Write(stateChange); err != nil { + fss.stateCache = nil + fss.stateCacheLock.Unlock() + return err + } + } + // reset cache + fss.stateCache = nil + fss.stateCacheLock.Unlock() + // write res to file + lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { + return err + } + // close file + return dstFile.Close() +} + +func (fss *StreamingService) openDeliverTxFile() (*os.File, error) { + fileName := fmt.Sprintf("block-%d-tx-%d", fss.currentBlockNumber, fss.currentTxIndex) + if fss.filePrefix != "" { + fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) + } + fss.currentTxIndex++ + return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0o600) +} + +// ListenEndBlock satisfies the baseapp.ABCIListener interface +// It writes the received EndBlock request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { + // generate the new file + dstFile, err := fss.openEndBlockFile() + if err != nil { + return err + } + // write req to file + lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { + return err + } + // write all state changes cached for this stage to file + fss.stateCacheLock.Lock() + for _, stateChange := range fss.stateCache { + if _, err = dstFile.Write(stateChange); err != nil { + fss.stateCache = nil + fss.stateCacheLock.Unlock() + return err + } + } + // reset cache + fss.stateCache = nil + fss.stateCacheLock.Unlock() + // write res to file + lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { + return err + } + // close file + return dstFile.Close() +} + +func (fss *StreamingService) openEndBlockFile() (*os.File, error) { + fileName := fmt.Sprintf("block-%d-end", fss.currentBlockNumber) + if fss.filePrefix != "" { + fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) + } + return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0o600) +} + +// Stream satisfies the baseapp.StreamingService interface +// It spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs +// and caches them in the order they were received +// returns an error if it is called twice +func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { + if fss.quitChan != nil { + return errors.New("`Stream` has already been called. The stream needs to be closed before it can be started again") + } + fss.quitChan = make(chan struct{}) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-fss.quitChan: + fss.quitChan = nil + return + case by := <-fss.srcChan: + fss.stateCacheLock.Lock() + fss.stateCache = append(fss.stateCache, by) + fss.stateCacheLock.Unlock() + } + } + }() + return nil +} + +// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Close() error { + close(fss.quitChan) + return nil +} + +// isDirWriteable checks if dir is writable by writing and removing a file +// to dir. It returns nil if dir is writable. +func isDirWriteable(dir string) error { + f := path.Join(dir, ".touch") + if err := os.WriteFile(f, []byte(""), 0o600); err != nil { + return err + } + return os.Remove(f) +} diff --git a/store/streaming/file/service_test.go b/store/streaming/file/service_test.go new file mode 100644 index 000000000000..46f1434beeb2 --- /dev/null +++ b/store/streaming/file/service_test.go @@ -0,0 +1,400 @@ +package file + +import ( + "encoding/binary" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/cosmos/cosmos-sdk/codec" + codecTypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" + types1 "github.com/tendermint/tendermint/proto/tendermint/types" +) + +var ( + interfaceRegistry = codecTypes.NewInterfaceRegistry() + testMarshaller = codec.NewProtoCodec(interfaceRegistry) + testStreamingService *StreamingService + testListener1, testListener2 types.WriteListener + emptyContext = sdk.Context{} + + // test abci message types + mockHash = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9} + testBeginBlockReq = abci.RequestBeginBlock{ + Header: types1.Header{ + Height: 1, + }, + ByzantineValidators: []abci.Evidence{}, + Hash: mockHash, + LastCommitInfo: abci.LastCommitInfo{ + Round: 1, + Votes: []abci.VoteInfo{}, + }, + } + testBeginBlockRes = abci.ResponseBeginBlock{ + Events: []abci.Event{ + { + Type: "testEventType1", + }, + { + Type: "testEventType2", + }, + }, + } + testEndBlockReq = abci.RequestEndBlock{ + Height: 1, + } + testEndBlockRes = abci.ResponseEndBlock{ + Events: []abci.Event{}, + ConsensusParamUpdates: &abci.ConsensusParams{}, + ValidatorUpdates: []abci.ValidatorUpdate{}, + } + mockTxBytes1 = []byte{9, 8, 7, 6, 5, 4, 3, 2, 1} + testDeliverTxReq1 = abci.RequestDeliverTx{ + Tx: mockTxBytes1, + } + mockTxBytes2 = []byte{8, 7, 6, 5, 4, 3, 2} + testDeliverTxReq2 = abci.RequestDeliverTx{ + Tx: mockTxBytes2, + } + mockTxResponseData1 = []byte{1, 3, 5, 7, 9} + testDeliverTxRes1 = abci.ResponseDeliverTx{ + Events: []abci.Event{}, + Code: 1, + Codespace: "mockCodeSpace", + Data: mockTxResponseData1, + GasUsed: 2, + GasWanted: 3, + Info: "mockInfo", + Log: "mockLog", + } + mockTxResponseData2 = []byte{1, 3, 5, 7, 9} + testDeliverTxRes2 = abci.ResponseDeliverTx{ + Events: []abci.Event{}, + Code: 1, + Codespace: "mockCodeSpace", + Data: mockTxResponseData2, + GasUsed: 2, + GasWanted: 3, + Info: "mockInfo", + Log: "mockLog", + } + + // mock store keys + mockStoreKey1 = sdk.NewKVStoreKey("mockStore1") + mockStoreKey2 = sdk.NewKVStoreKey("mockStore2") + + // file stuff + testPrefix = "testPrefix" + testDir = "./.test" + + // mock state changes + mockKey1 = []byte{1, 2, 3} + mockValue1 = []byte{3, 2, 1} + mockKey2 = []byte{2, 3, 4} + mockValue2 = []byte{4, 3, 2} + mockKey3 = []byte{3, 4, 5} + mockValue3 = []byte{5, 4, 3} +) + +func TestIntermediateWriter(t *testing.T) { + outChan := make(chan []byte, 0) + iw := NewIntermediateWriter(outChan) + require.IsType(t, &IntermediateWriter{}, iw) + testBytes := []byte{1, 2, 3, 4, 5} + var length int + var err error + waitChan := make(chan struct{}, 0) + go func() { + length, err = iw.Write(testBytes) + waitChan <- struct{}{} + }() + receivedBytes := <-outChan + <-waitChan + require.Equal(t, len(testBytes), length) + require.Equal(t, testBytes, receivedBytes) + require.Nil(t, err) +} + +func TestFileStreamingService(t *testing.T) { + if os.Getenv("CI") != "" { + t.Skip("Skipping TestFileStreamingService in CI environment") + } + err := os.Mkdir(testDir, 0700) + require.Nil(t, err) + defer os.RemoveAll(testDir) + + testKeys := []types.StoreKey{mockStoreKey1, mockStoreKey2} + testStreamingService, err = NewStreamingService(testDir, testPrefix, testKeys, testMarshaller) + require.Nil(t, err) + require.IsType(t, &StreamingService{}, testStreamingService) + require.Equal(t, testPrefix, testStreamingService.filePrefix) + require.Equal(t, testDir, testStreamingService.writeDir) + require.Equal(t, testMarshaller, testStreamingService.codec) + testListener1 = testStreamingService.listeners[mockStoreKey1][0] + testListener2 = testStreamingService.listeners[mockStoreKey2][0] + wg := new(sync.WaitGroup) + testStreamingService.Stream(wg) + testListenBeginBlock(t) + testListenDeliverTx1(t) + testListenDeliverTx2(t) + testListenEndBlock(t) + testStreamingService.Close() + wg.Wait() +} + +func testListenBeginBlock(t *testing.T) { + expectedBeginBlockReqBytes, err := testMarshaller.Marshal(&testBeginBlockReq) + require.Nil(t, err) + expectedBeginBlockResBytes, err := testMarshaller.Marshal(&testBeginBlockRes) + require.Nil(t, err) + + // write state changes + testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) + testListener2.OnWrite(mockStoreKey2, mockKey2, mockValue2, false) + testListener1.OnWrite(mockStoreKey1, mockKey3, mockValue3, false) + + // expected KV pairs + expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey1, + Value: mockValue1, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey2, + Value: mockValue2, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey3, + Value: mockValue3, + Delete: false, + }) + require.Nil(t, err) + + // send the ABCI messages + err = testStreamingService.ListenBeginBlock(emptyContext, testBeginBlockReq, testBeginBlockRes) + require.Nil(t, err) + + // load the file, checking that it was created with the expected name + fileName := fmt.Sprintf("%s-block-%d-begin", testPrefix, testBeginBlockReq.GetHeader().Height) + fileBytes, err := readInFile(fileName) + require.Nil(t, err) + + // segment the file into the separate gRPC messages and check the correctness of each + segments, err := segmentBytes(fileBytes) + require.Nil(t, err) + require.Equal(t, 5, len(segments)) + require.Equal(t, expectedBeginBlockReqBytes, segments[0]) + require.Equal(t, expectedKVPair1, segments[1]) + require.Equal(t, expectedKVPair2, segments[2]) + require.Equal(t, expectedKVPair3, segments[3]) + require.Equal(t, expectedBeginBlockResBytes, segments[4]) +} + +func testListenDeliverTx1(t *testing.T) { + expectedDeliverTxReq1Bytes, err := testMarshaller.Marshal(&testDeliverTxReq1) + require.Nil(t, err) + expectedDeliverTxRes1Bytes, err := testMarshaller.Marshal(&testDeliverTxRes1) + require.Nil(t, err) + + // write state changes + testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) + testListener2.OnWrite(mockStoreKey2, mockKey2, mockValue2, false) + testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) + + // expected KV pairs + expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey1, + Value: mockValue1, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey2, + Value: mockValue2, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey3, + Value: mockValue3, + Delete: false, + }) + require.Nil(t, err) + + // send the ABCI messages + err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq1, testDeliverTxRes1) + require.Nil(t, err) + + // load the file, checking that it was created with the expected name + fileName := fmt.Sprintf("%s-block-%d-tx-%d", testPrefix, testBeginBlockReq.GetHeader().Height, 0) + fileBytes, err := readInFile(fileName) + require.Nil(t, err) + + // segment the file into the separate gRPC messages and check the correctness of each + segments, err := segmentBytes(fileBytes) + require.Nil(t, err) + require.Equal(t, 5, len(segments)) + require.Equal(t, expectedDeliverTxReq1Bytes, segments[0]) + require.Equal(t, expectedKVPair1, segments[1]) + require.Equal(t, expectedKVPair2, segments[2]) + require.Equal(t, expectedKVPair3, segments[3]) + require.Equal(t, expectedDeliverTxRes1Bytes, segments[4]) +} + +func testListenDeliverTx2(t *testing.T) { + expectedDeliverTxReq2Bytes, err := testMarshaller.Marshal(&testDeliverTxReq2) + require.Nil(t, err) + expectedDeliverTxRes2Bytes, err := testMarshaller.Marshal(&testDeliverTxRes2) + require.Nil(t, err) + + // write state changes + testListener1.OnWrite(mockStoreKey2, mockKey1, mockValue1, false) + testListener2.OnWrite(mockStoreKey1, mockKey2, mockValue2, false) + testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) + + // expected KV pairs + expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey1, + Value: mockValue1, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey2, + Value: mockValue2, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey3, + Value: mockValue3, + Delete: false, + }) + require.Nil(t, err) + + // send the ABCI messages + err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq2, testDeliverTxRes2) + require.Nil(t, err) + + // load the file, checking that it was created with the expected name + fileName := fmt.Sprintf("%s-block-%d-tx-%d", testPrefix, testBeginBlockReq.GetHeader().Height, 1) + fileBytes, err := readInFile(fileName) + require.Nil(t, err) + + // segment the file into the separate gRPC messages and check the correctness of each + segments, err := segmentBytes(fileBytes) + require.Nil(t, err) + require.Equal(t, 5, len(segments)) + require.Equal(t, expectedDeliverTxReq2Bytes, segments[0]) + require.Equal(t, expectedKVPair1, segments[1]) + require.Equal(t, expectedKVPair2, segments[2]) + require.Equal(t, expectedKVPair3, segments[3]) + require.Equal(t, expectedDeliverTxRes2Bytes, segments[4]) +} + +func testListenEndBlock(t *testing.T) { + expectedEndBlockReqBytes, err := testMarshaller.Marshal(&testEndBlockReq) + require.Nil(t, err) + expectedEndBlockResBytes, err := testMarshaller.Marshal(&testEndBlockRes) + require.Nil(t, err) + + // write state changes + testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) + testListener2.OnWrite(mockStoreKey1, mockKey2, mockValue2, false) + testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) + + // expected KV pairs + expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey1, + Value: mockValue1, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey2, + Value: mockValue2, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey3, + Value: mockValue3, + Delete: false, + }) + require.Nil(t, err) + + // send the ABCI messages + err = testStreamingService.ListenEndBlock(emptyContext, testEndBlockReq, testEndBlockRes) + require.Nil(t, err) + + // load the file, checking that it was created with the expected name + fileName := fmt.Sprintf("%s-block-%d-end", testPrefix, testEndBlockReq.Height) + fileBytes, err := readInFile(fileName) + require.Nil(t, err) + + // segment the file into the separate gRPC messages and check the correctness of each + segments, err := segmentBytes(fileBytes) + require.Nil(t, err) + require.Equal(t, 5, len(segments)) + require.Equal(t, expectedEndBlockReqBytes, segments[0]) + require.Equal(t, expectedKVPair1, segments[1]) + require.Equal(t, expectedKVPair2, segments[2]) + require.Equal(t, expectedKVPair3, segments[3]) + require.Equal(t, expectedEndBlockResBytes, segments[4]) +} + +func readInFile(name string) ([]byte, error) { + path := filepath.Join(testDir, name) + return os.ReadFile(path) +} + +// Returns all of the protobuf messages contained in the byte array as an array of byte arrays +// The messages have their length prefix removed +func segmentBytes(bz []byte) ([][]byte, error) { + var err error + segments := make([][]byte, 0) + for len(bz) > 0 { + var segment []byte + segment, bz, err = getHeadSegment(bz) + if err != nil { + return nil, err + } + segments = append(segments, segment) + } + return segments, nil +} + +// Returns the bytes for the leading protobuf object in the byte array (removing the length prefix) and returns the remainder of the byte array +func getHeadSegment(bz []byte) ([]byte, []byte, error) { + size, prefixSize := binary.Uvarint(bz) + if prefixSize < 0 { + return nil, nil, fmt.Errorf("invalid number of bytes read from length-prefixed encoding: %d", prefixSize) + } + if size > uint64(len(bz)-prefixSize) { + return nil, nil, fmt.Errorf("not enough bytes to read; want: %v, got: %v", size, len(bz)-prefixSize) + } + return bz[prefixSize:(uint64(prefixSize) + size)], bz[uint64(prefixSize)+size:], nil +}