Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration #8664

Merged
merged 54 commits into from
Oct 24, 2021
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
0a88c29
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
875f23e
adjust multistores to fit new MultiStore interface and enable wrappin…
i-norden Feb 8, 2021
5e02e3d
update server mock KVStore and MultiStore
i-norden Feb 9, 2021
c394e5d
fix bug identified in CI
i-norden Feb 9, 2021
c92556a
improve codecov, minor fixes/adjustments
i-norden Feb 10, 2021
18bf622
review fixes
i-norden Feb 24, 2021
d92f1d0
review updates; flip set to delete in KVStorePair, updated proto-docs…
i-norden Mar 5, 2021
72d6a88
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
602867e
review fixes
i-norden Feb 24, 2021
93dbea9
adjust KVStores to fit new CacheWrapper interface
i-norden Feb 8, 2021
441af0e
review fixes
i-norden Feb 24, 2021
f7ecbcb
hook and streaming service interfaces
i-norden Feb 8, 2021
5f74b68
integrate Hooks and StreamingService into BaseApp
i-norden Feb 8, 2021
d3c44a4
begin file streaming service implementation
i-norden Feb 8, 2021
7cc4dbd
update Hook interface to return errors so that they can be logged at …
i-norden Feb 11, 2021
7d2e9d6
finish implementation of the file streaming service
i-norden Feb 11, 2021
8f72e00
streaming service unit tests; minor adjustments
i-norden Feb 18, 2021
613a4c5
streaming service constuctor, constructor unit test, update adr
i-norden Feb 22, 2021
c6ad1ed
example toml configuration
i-norden Feb 22, 2021
7198289
ci/linting fixes
i-norden Feb 22, 2021
2fffbd0
simapp integration
i-norden Mar 1, 2021
d398cbf
update changelog
i-norden Mar 3, 2021
e0a1f32
documentation for configuring and using a StreamingService
i-norden Mar 3, 2021
caf96a0
update to use new KVStorePair type
i-norden Mar 5, 2021
6fdb3c1
fix double cache wrap issue; prefer wrapping with listener vs tracer
i-norden Mar 30, 2021
921f289
review refactor
i-norden Apr 16, 2021
d5bbb0a
fix linting
i-norden Apr 16, 2021
901e62f
review fixes
i-norden Apr 20, 2021
6cf023f
adjustments after rebase
i-norden Jun 4, 2021
dfe63d8
Merge branch 'master' into adr038_streamingservice
i-norden Aug 10, 2021
20087a1
Merge branch 'master' into adr038_streamingservice
i-norden Sep 7, 2021
5af6d7e
Merge branch 'master' into adr038_streamingservice
tac0turtle Sep 16, 2021
3b0f7ef
Merge branch 'master' into adr038_streamingservice
i-norden Sep 20, 2021
d29f880
add state cache mutex to prevent race condition detection error; alth…
i-norden Sep 20, 2021
c72a44d
Merge branch 'master' into adr038_streamingservice
tac0turtle Sep 21, 2021
3fa194d
review updates
i-norden Sep 23, 2021
fe4c30e
Merge branch 'master' into adr038_streamingservice
i-norden Sep 23, 2021
7584a9f
Merge branch 'master' into adr038_streamingservice
i-norden Oct 1, 2021
942a4f4
review fixes
i-norden Oct 1, 2021
5d69858
skip finicky test in CI environment
i-norden Oct 5, 2021
ee0fa3d
Merge branch 'master' into adr038_streamingservice
i-norden Oct 5, 2021
1ff64ee
adjustments after rebase
i-norden Oct 5, 2021
de3c42a
add store.md section for listenkv
i-norden Oct 5, 2021
ba033ac
review adjustments
i-norden Oct 14, 2021
748981b
Merge branch 'master' into adr038_streamingservice
i-norden Oct 14, 2021
51a5485
review fixes
i-norden Oct 18, 2021
64cb321
Merge branch 'master' into adr038_streamingservice
i-norden Oct 18, 2021
35c7ad1
Merge branch 'master' into adr038_streamingservice
i-norden Oct 20, 2021
a8c74f6
not sure why the unit tests are failing to read from the github actio…
i-norden Oct 20, 2021
70451e8
Merge branch 'master' into adr038_streamingservice
i-norden Oct 20, 2021
20da072
Merge branch 'master' into adr038_streamingservice
i-norden Oct 21, 2021
4ab7ca7
Merge branch 'master' into adr038_streamingservice
i-norden Oct 22, 2021
7a4b4f2
Merge branch 'master' into adr038_streamingservice
tac0turtle Oct 24, 2021
ea352a7
Merge branch 'master' into adr038_streamingservice
tac0turtle Oct 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [\#9776](https://github.com/cosmos/cosmos-sdk/pull/9776) Add flag `staking-bond-denom` to specify the staking bond denomination value when initializing a new chain.
* [\#9533](https://github.com/cosmos/cosmos-sdk/pull/9533) Added a new gRPC method, `DenomOwners`, in `x/bank` to query for all account holders of a specific denomination.
* (bank) [\#9618](https://github.com/cosmos/cosmos-sdk/pull/9618) Update bank.Metadata: add URI and URIHash attributes.
* (store) [\#8664](https://github.com/cosmos/cosmos-sdk/pull/8664) Implementation of ADR-038 file StreamingService
* [\#9837](https://github.com/cosmos/cosmos-sdk/issues/9837) `--generate-only` flag will accept the keyname now.

### API Breaking Changes
Expand Down
31 changes: 28 additions & 3 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
}
// set the signed validators for addition to context in deliverTx
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the hooks with the BeginBlock messages
for _, streamingListener := range app.streamingListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
}

return res
}

Expand All @@ -215,6 +223,13 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
res.ConsensusParamUpdates = cp
}

// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.streamingListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
}

return res
}

Expand Down Expand Up @@ -262,15 +277,25 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")

var res abci.ResponseDeliverTx
defer func() {
for _, streamingListener := range app.streamingListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}
}()
tx, err := app.txDecoder(req.Tx)
if err != nil {
return sdkerrors.ResponseDeliverTx(err, 0, 0, app.trace)
res = sdkerrors.ResponseDeliverTx(err, 0, 0, app.trace)
return res
}

ctx := app.getContextForTx(runTxModeDeliver, req.Tx)
res, err := app.txHandler.DeliverTx(ctx, tx, req)
res, err = app.txHandler.DeliverTx(ctx, tx, req)
if err != nil {
return sdkerrors.ResponseDeliverTx(err, uint64(res.GasUsed), uint64(res.GasWanted), app.trace)
res = sdkerrors.ResponseDeliverTx(err, uint64(res.GasUsed), uint64(res.GasWanted), app.trace)
return res
}

return res
Expand Down
4 changes: 4 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ type BaseApp struct { // nolint: maligned
// indexEvents defines the set of events in the form {eventType}.{attributeKey},
// which informs Tendermint what to index. If empty, all events will be indexed.
indexEvents map[string]struct{}

// StreamingListener for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
streamingListeners []StreamingListener
i-norden marked this conversation as resolved.
Show resolved Hide resolved
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
11 changes: 11 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,14 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
app.interfaceRegistry = registry
app.grpcQueryRouter.SetInterfaceRegistry(registry)
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
i-norden marked this conversation as resolved.
Show resolved Hide resolved
}
// register the streamingListeners within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks
app.streamingListeners = append(app.streamingListeners, s)
}
30 changes: 30 additions & 0 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package baseapp

import (
"sync"

abci "github.com/tendermint/tendermint/abci/types"

store "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/types"
)

// StreamingListener interface used to hook into the ABCI message processing of the BaseApp
type StreamingListener interface {
i-norden marked this conversation as resolved.
Show resolved Hide resolved
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
i-norden marked this conversation as resolved.
Show resolved Hide resolved
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup, quitChan <-chan struct{})
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[types.StoreKey][]store.WriteListener
// StreamingListener interface for hooking into the ABCI messages from inside the BaseApp
StreamingListener
}
126 changes: 63 additions & 63 deletions docs/architecture/adr-038-state-listening.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ In a new file, `store/types/listening.go`, we will create a `WriteListener` inte
type WriteListener interface {
// if value is nil then it was deleted
// storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores
// set bool indicates if it was a set; true: set, false: delete
// delete bool indicates if it was a delete; true: delete, false: set
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
}
```
Expand Down Expand Up @@ -207,18 +207,18 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore {
We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers.

```go
// Hook interface used to hook into the ABCI message processing of the BaseApp
type Hook interface {
ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) // update the streaming service with the latest BeginBlock messages
ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) // update the steaming service with the latest EndBlock messages
ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) // update the steaming service with the latest DeliverTx messages
// StreamingListener interface used to hook into the ABCI message processing of the BaseApp
type StreamingListener interface {
ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error // update the streaming service with the latest BeginBlock messages
ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error// update the steaming service with the latest EndBlock messages
ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error // update the steaming service with the latest DeliverTx messages
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
Stream(wg *sync.WaitGroup, quitChan <-chan struct{}) // streaming service loop, awaits kv pairs and writes them to some destination stream or file
Listeners() map[sdk.StoreKey][]storeTypes.WriteListener // returns the streaming service's listeners for the BaseApp to register
Hook
Listeners() map[sdk.StoreKey][]storeTypes.WriteListener // returns the streaming service's listeners for the BaseApp to register
StreamingListener
}
```

Expand Down Expand Up @@ -384,8 +384,8 @@ using the provided `AppOptions` and TOML configuration fields.
We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s:

```go
// RegisterStreamingService is used to register a streaming service with the BaseApp
func (app *BaseApp) RegisterHooks(s StreamingService) {
// SetStreamingService is used to register a streaming service with the BaseApp
func (app *BaseApp) SetStreamingService(s StreamingService) {
// set the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
Expand Down Expand Up @@ -482,60 +482,60 @@ We will also provide a mapping of the TOML `store.streamers` "file" configuratio
streaming service. In the future, as other streaming services are added, their constructors will be added here as well.

```go
// StreamingServiceConstructor is used to construct a streaming service
type StreamingServiceConstructor func(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error)
// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error)

// StreamingServiceType enum for specifying the type of StreamingService
type StreamingServiceType int
// ServiceType enum for specifying the type of StreamingService
type ServiceType int

const (
Unknown StreamingServiceType = iota
File
// add more in the future
Unknown ServiceType = iota
File
// add more in the future
)

// NewStreamingServiceType returns the StreamingServiceType corresponding to the provided name
func NewStreamingServiceType(name string) StreamingServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File
default:
return Unknown
}
}

// String returns the string name of a StreamingServiceType
func (sst StreamingServiceType) String() string {
switch sst {
case File:
return "file"
default:
return ""
}
}

// StreamingServiceConstructorLookupTable is a mapping of StreamingServiceTypes to StreamingServiceConstructors
var StreamingServiceConstructorLookupTable = map[StreamingServiceType]StreamingServiceConstructor{
File: FileStreamingConstructor,
}

// NewStreamingServiceConstructor returns the StreamingServiceConstructor corresponding to the provided name
func NewStreamingServiceConstructor(name string) (StreamingServiceConstructor, error) {
ssType := NewStreamingServiceType(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}
if constructor, ok := StreamingServiceConstructorLookupTable[ssType]; ok {
return constructor, nil
}
return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}

// FileStreamingConstructor is the StreamingServiceConstructor function for creating a FileStreamingService
func FileStreamingConstructor(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error) {
filePrefix := cast.ToString(opts.Get("streamers.file.prefix"))
fileDir := cast.ToString(opts.Get("streamers.file.writeDir"))
return streaming.NewFileStreamingService(fileDir, filePrefix, keys), nil
// NewStreamingServiceType returns the streaming.ServiceType corresponding to the provided name
func NewStreamingServiceType(name string) ServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File
default:
return Unknown
}
}

// String returns the string name of a streaming.ServiceType
func (sst ServiceType) String() string {
switch sst {
case File:
return "file"
default:
return ""
}
}

// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
File: FileStreamingConstructor,
}

// NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name
func NewServiceConstructor(name string) (ServiceConstructor, error) {
ssType := NewStreamingServiceType(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}
if constructor, ok := ServiceConstructorLookupTable[ssType]; ok {
return constructor, nil
}
return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}

// FileStreamingConstructor is the streaming.ServiceConstructor function for creating a FileStreamingService
func FileStreamingConstructor(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) {
filePrefix := cast.ToString(opts.Get("streamers.file.prefix"))
fileDir := cast.ToString(opts.Get("streamers.file.writeDir"))
return file.NewStreamingService(fileDir, filePrefix, keys, marshaller)
}
```

Expand Down Expand Up @@ -564,8 +564,8 @@ func NewSimApp(
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
for _, listenerName := range listeners {
// get the store keys allowed to be exposed for this streaming service/state listeners
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", listenerName))
exposeStoreKeys = make([]storeTypes.StoreKey, 0, len(exposeKeyStrs))
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", listenerName)))
exposeStoreKeys := make([]storeTypes.StoreKey, 0, len(exposeKeyStrs))
for _, keyStr := range exposeKeyStrs {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
Expand All @@ -577,15 +577,15 @@ func NewSimApp(
tmos.Exit(err.Error()) // or continue?
}
// generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose
streamingService, err := constructor(appOpts, exposeStoreKeys)
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
tmos.Exit(err.Error())
}
// register the streaming service with the BaseApp
bApp.RegisterStreamingService(streamingService)
// waitgroup and quit channel for optional shutdown coordination of the streaming service
wg := new(sync.WaitGroup)
quitChan := new(chan struct{}))
quitChan := make(chan struct{}))
// kick off the background streaming service loop
streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead?
}
Expand Down
18 changes: 0 additions & 18 deletions docs/core/proto-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
- [Output](#cosmos.bank.v1beta1.Output)
- [Params](#cosmos.bank.v1beta1.Params)
- [SendEnabled](#cosmos.bank.v1beta1.SendEnabled)
- [Supply](#cosmos.bank.v1beta1.Supply)
i-norden marked this conversation as resolved.
Show resolved Hide resolved

- [cosmos/bank/v1beta1/genesis.proto](#cosmos/bank/v1beta1/genesis.proto)
- [Balance](#cosmos.bank.v1beta1.Balance)
Expand Down Expand Up @@ -1870,23 +1869,6 @@ sendable).




<a name="cosmos.bank.v1beta1.Supply"></a>

### Supply
Supply represents a struct that passively keeps track of the total supply
amounts in the network.
This message is deprecated now that supply is indexed by denom.


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| `total` | [cosmos.base.v1beta1.Coin](#cosmos.base.v1beta1.Coin) | repeated | |





<!-- end messages -->

<!-- end enums -->
Expand Down
6 changes: 6 additions & 0 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
simappparams "github.com/cosmos/cosmos-sdk/simapp/params"
"github.com/cosmos/cosmos-sdk/store/streaming"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/module"
Expand Down Expand Up @@ -215,6 +216,11 @@ func NewSimApp(
// not include this key.
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")

// configure state listening capabilities using AppOptions
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil {
tmos.Exit(err.Error())
}

app := &SimApp{
BaseApp: bApp,
legacyAmino: legacyAmino,
Expand Down
13 changes: 6 additions & 7 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@ func NewFromKVStore(

for key, store := range stores {
var cacheWrapped types.CacheWrap
if cms.TracingEnabled() {
switch {
fedekunze marked this conversation as resolved.
Show resolved Hide resolved
case cms.ListeningEnabled(key):
cacheWrapped = store.CacheWrapWithListeners(key, cms.listeners[key])
case cms.TracingEnabled():
i-norden marked this conversation as resolved.
Show resolved Hide resolved
cacheWrapped = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext)
} else {
default:
cacheWrapped = store.CacheWrap()
}
if cms.ListeningEnabled(key) {
cms.stores[key] = cacheWrapped.CacheWrapWithListeners(key, cms.listeners[key])
} else {
cms.stores[key] = cacheWrapped
}
cms.stores[key] = cacheWrapped
}

return cms
Expand Down
Loading