Skip to content

Commit

Permalink
feat: ADR-038 Part 2: StreamingService interface, file writing implem…
Browse files Browse the repository at this point in the history
…entation, and configuration (cosmos#8664) (cosmos#13326)

* feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (cosmos#8664)

<!-- < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < ☺
v                               ✰  Thanks for creating a PR! ✰
v    Before smashing the submit button please review the checkboxes.
v    If a checkbox is n/a - please still include it but + a little note why
☺ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >  -->

<!-- Add a description of the changes that this PR introduces and the files that
are the most critical to review.
-->

Hello 👋 this PR introduces the second stage of changes to support [ADR-038](cosmos#8012) state listening. This is rebased on top of the [first segment](cosmos#8551), which introduces the low level changes to the MultiStore and KVStore interfaces and implementations, the new WriteListener types, and the new listen.KVStore type.

In this segment we introduce the StreamingService interface, an implementation that writes out to files, and it's integration and configuration at the BaseApp level.

The idea was to have the first segment reviewed independently first but if people think it is easier/more time efficient to review both at the same time then we could start here.

Thanks!

This work is towards satisfying [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md)

---

Before we can merge this PR, please make sure that all the following items have been
checked off. If any of the checklist items are not applicable, please leave them but
write a little note why.

- [x] Targeted PR against correct branch (see [CONTRIBUTING.md](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [x] Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
- [x] Code follows the [module structure standards](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/structure.md).
- [x] Wrote unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [x] Updated relevant documentation (`docs/`) or specification (`x/<module>/spec/`)
- [x] Added relevant `godoc` [comments](https://blog.golang.org/godoc-documenting-go-code).
- [x] Added a relevant changelog entry to the `Unreleased` section in `CHANGELOG.md`
- [x] Re-reviewed `Files changed` in the Github PR explorer
- [x] Review `Codecov Report` in the comment section below once CI passes

* fix lint

* Update CHANGELOG.md

Co-authored-by: Ian Norden <iansnorden@gmail.com>
  • Loading branch information
2 people authored and JeancarloBarrios committed Sep 28, 2024
1 parent 360e92c commit 1d6adca
Show file tree
Hide file tree
Showing 15 changed files with 1,253 additions and 392 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ Ref: https://keepachangelog.com/en/1.0.0/

- (cli) [#13089](https://github.com/cosmos/cosmos-sdk/pull/13089) Fix rollback command don't actually delete multistore versions, added method `RollbackToVersion` to interface `CommitMultiStore` and added method `CommitMultiStore` to `Application` interface.

### Improvements

* (store) [\#13326](https://github.com/cosmos/cosmos-sdk/pull/13326) Implementation of ADR-038 file StreamingService, backport #8664

## v0.45.8 - 2022-08-25

### Improvements
Expand Down
25 changes: 23 additions & 2 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
}
// set the signed validators for addition to context in deliverTx
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the hooks with the BeginBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
}

return res
}

Expand All @@ -264,7 +272,12 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
res.ConsensusParamUpdates = cp
}

app.deliverState.eventHistory = []abci.Event{}
// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
}

return res
}
Expand Down Expand Up @@ -310,9 +323,17 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
// Otherwise, the ResponseDeliverTx will contain releveant error information.
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")

defer func() {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}
}()

gInfo := sdk.GasInfo{}
resultStr := "successful"

Expand Down
17 changes: 3 additions & 14 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,9 @@ type BaseApp struct { //nolint: maligned
// which informs CometBFT what to index. If empty, all events will be indexed.
indexEvents map[string]struct{}

// streamingManager for managing instances and configuration of ABCIListener services
streamingManager storetypes.StreamingManager

chainID string

cdc codec.Codec

// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution

// includeNestedMsgsGas holds a set of message types for which gas costs for its nested messages are calculated.
includeNestedMsgsGas map[string]struct{}
// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
11 changes: 11 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,14 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) {
app.grpcQueryRouter = grpcQueryRouter
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
}
217 changes: 25 additions & 192 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -1,200 +1,33 @@
package baseapp

import (
"context"
"fmt"
"sort"
"strings"
"io"
"sync"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/spf13/cast"
abci "github.com/tendermint/tendermint/abci/types"

"cosmossdk.io/log"
"cosmossdk.io/schema"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/schema/decoding"
"cosmossdk.io/schema/indexer"
"cosmossdk.io/store/streaming"
storetypes "cosmossdk.io/store/types"

"github.com/cosmos/cosmos-sdk/client/flags"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
)

const (
StreamingTomlKey = "streaming"
StreamingABCITomlKey = "abci"
StreamingABCIPluginTomlKey = "plugin"
StreamingABCIKeysTomlKey = "keys"
StreamingABCIStopNodeOnErrTomlKey = "stop-node-on-err"
store "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/types"
)

// EnableIndexer enables the built-in indexer with the provided options (usually from the app.toml indexer key),
// kv-store keys, and app modules. Using the built-in indexer framework is mutually exclusive from using other
// types of streaming listeners.
func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*storetypes.KVStoreKey, appModules map[string]any) error {
listener, err := indexer.StartManager(indexer.ManagerOptions{
Config: indexerOpts,
Resolver: decoding.ModuleSetDecoderResolver(appModules),
SyncSource: nil,
Logger: app.logger.With(log.ModuleKey, "indexer"),
})
if err != nil {
return err
}

exposedKeys := exposeStoreKeysSorted([]string{"*"}, keys)
app.cms.AddListeners(exposedKeys)

app.streamingManager = storetypes.StreamingManager{
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener}},
StopNodeOnErr: true,
}

return nil
}

// RegisterStreamingServices registers streaming services with the BaseApp.
func (app *BaseApp) RegisterStreamingServices(appOpts servertypes.AppOptions, keys map[string]*storetypes.KVStoreKey) error {
// register streaming services
streamingCfg := cast.ToStringMap(appOpts.Get(StreamingTomlKey))
for service := range streamingCfg {
pluginKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, service, StreamingABCIPluginTomlKey)
pluginName := strings.TrimSpace(cast.ToString(appOpts.Get(pluginKey)))
if len(pluginName) > 0 {
logLevel := cast.ToString(appOpts.Get(flags.FlagLogLevel))
plugin, err := streaming.NewStreamingPlugin(pluginName, logLevel)
if err != nil {
return fmt.Errorf("failed to load streaming plugin: %w", err)
}
if err := app.registerStreamingPlugin(appOpts, keys, plugin); err != nil {
return fmt.Errorf("failed to register streaming plugin %w", err)
}
}
}

return nil
}

// registerStreamingPlugin registers streaming plugins with the BaseApp.
func (app *BaseApp) registerStreamingPlugin(
appOpts servertypes.AppOptions,
keys map[string]*storetypes.KVStoreKey,
streamingPlugin interface{},
) error {
v, ok := streamingPlugin.(storetypes.ABCIListener)
if !ok {
return fmt.Errorf("unexpected plugin type %T", v)
}

app.registerABCIListenerPlugin(appOpts, keys, v)
return nil
}

// registerABCIListenerPlugin registers plugins that implement the ABCIListener interface.
func (app *BaseApp) registerABCIListenerPlugin(
appOpts servertypes.AppOptions,
keys map[string]*storetypes.KVStoreKey,
abciListener storetypes.ABCIListener,
) {
stopNodeOnErrKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIStopNodeOnErrTomlKey)
stopNodeOnErr := cast.ToBool(appOpts.Get(stopNodeOnErrKey))
keysKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIKeysTomlKey)
exposeKeysStr := cast.ToStringSlice(appOpts.Get(keysKey))
exposedKeys := exposeStoreKeysSorted(exposeKeysStr, keys)
app.cms.AddListeners(exposedKeys)
app.SetStreamingManager(
storetypes.StreamingManager{
ABCIListeners: []storetypes.ABCIListener{abciListener},
StopNodeOnErr: stopNodeOnErr,
},
)
}

func exposeAll(list []string) bool {
for _, ele := range list {
if ele == "*" {
return true
}
}
return false
}

func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStoreKey) []storetypes.StoreKey {
var exposeStoreKeys []storetypes.StoreKey
if exposeAll(keysStr) {
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keys))
for key := range keys {
exposeStoreKeys = append(exposeStoreKeys, keys[key])
}
} else {
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keysStr))
for _, keyStr := range keysStr {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
}
}
// sort storeKeys for deterministic output
sort.SliceStable(exposeStoreKeys, func(i, j int) bool {
return exposeStoreKeys[i].Name() < exposeStoreKeys[j].Name()
})

return exposeStoreKeys
}

type listenerWrapper struct {
listener appdata.Listener
}

func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
if p.listener.StartBlock != nil {
err := p.listener.StartBlock(appdata.StartBlockData{
Height: uint64(req.Height),
})
if err != nil {
return err
}
}

//// TODO txs, events

return nil
}

func (p listenerWrapper) ListenCommit(ctx context.Context, res abci.CommitResponse, changeSet []*storetypes.StoreKVPair) error {
if cb := p.listener.OnKVPair; cb != nil {
updates := make([]appdata.ActorKVPairUpdate, len(changeSet))
for i, pair := range changeSet {
updates[i] = appdata.ActorKVPairUpdate{
Actor: []byte(pair.StoreKey),
StateChanges: []schema.KVPairUpdate{
{
Key: pair.Key,
Value: pair.Value,
Remove: pair.Delete,
},
},
}
}
err := cb(appdata.KVPairData{Updates: updates})
if err != nil {
return err
}
}

if p.listener.Commit != nil {
commitCb, err := p.listener.Commit(appdata.CommitData{})
if err != nil {
return err
}
if commitCb != nil {
err := commitCb()
if err != nil {
return err
}
}
}

return nil
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[store.StoreKey][]store.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
io.Closer
}
Loading

0 comments on commit 1d6adca

Please sign in to comment.