From 27cc99b9b2bbe2a7e1b913e60db79fe7305fb0db Mon Sep 17 00:00:00 2001 From: yihuang Date: Tue, 27 Feb 2024 16:18:29 +0800 Subject: [PATCH] TxExecutor baseapp option, TxIndex/MsgIndex in context --- CHANGELOG.md | 1 + baseapp/abci.go | 71 +++++++++++++++++++++++++---------------- baseapp/baseapp.go | 27 +++++++++++++--- baseapp/genesis.go | 2 +- baseapp/options.go | 10 ++++++ baseapp/test_helpers.go | 4 +-- baseapp/txexecutor.go | 15 +++++++++ types/context.go | 15 +++++++++ 8 files changed, 109 insertions(+), 36 deletions(-) create mode 100644 baseapp/txexecutor.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a073a86882cd..126dc510d709 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i * (types) [#18768](https://github.com/cosmos/cosmos-sdk/pull/18768) Add MustValAddressFromBech32 function. * (gRPC) [#19049](https://github.com/cosmos/cosmos-sdk/pull/19049) Add debug log prints for each gRPC request. * (x/consensus) [#19483](https://github.com/cosmos/cosmos-sdk/pull/19483) Add consensus messages registration to consensus module. +* (baseapp) [#]() Add TxExecutor option, and `TxIndex`/`MsgIndex` to context. ### Improvements diff --git a/baseapp/abci.go b/baseapp/abci.go index e54824f41e0f..e5f7a57be73a 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -803,34 +803,10 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request // // NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g. // vote extensions, so skip those. - txResults := make([]*abci.ExecTxResult, 0, len(req.Txs)) - for _, rawTx := range req.Txs { - var response *abci.ExecTxResult - - if _, err := app.txDecoder(rawTx); err == nil { - response = app.deliverTx(rawTx) - } else { - // In the case where a transaction included in a block proposal is malformed, - // we still want to return a default response to comet. This is because comet - // expects a response for each transaction included in a block proposal. - response = sdkerrors.ResponseExecTxResultWithEvents( - sdkerrors.ErrTxDecode, - 0, - 0, - nil, - false, - ) - } - - // check after every tx if we should abort - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - // continue - } - - txResults = append(txResults, response) + txResults, err := app.executeTxs(ctx, req.Txs) + if err != nil { + // usually due to cancelled + return nil, err } if app.finalizeBlockState.ms.TracingEnabled() { @@ -861,6 +837,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request }, nil } +func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) { + if app.txExecutor != nil { + return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult { + return app.deliverTxWithMultiStore(txs[i], i, ms) + }) + } + + txResults := make([]*abci.ExecTxResult, 0, len(txs)) + for i, rawTx := range txs { + var response *abci.ExecTxResult + + if _, err := app.txDecoder(rawTx); err == nil { + response = app.deliverTx(rawTx, i) + } else { + // In the case where a transaction included in a block proposal is malformed, + // we still want to return a default response to comet. This is because comet + // expects a response for each transaction included in a block proposal. + response = sdkerrors.ResponseExecTxResultWithEvents( + sdkerrors.ErrTxDecode, + 0, + 0, + nil, + false, + ) + } + + // check after every tx if we should abort + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + // continue + } + + txResults = append(txResults, response) + } + return txResults, nil +} + // FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock. // Specifically, it will execute an application's BeginBlock (if defined), followed // by the transactions in the proposal, finally followed by the application's diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 2cef05e7ccd5..c60592cfe039 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -185,6 +185,9 @@ type BaseApp struct { // including the goroutine handling.This is experimental and must be enabled // by developers. optimisticExec *oe.OptimisticExecution + + // Optional alternative tx executor, used for block-stm parallel transaction execution. + txExecutor TxExecutor } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a @@ -657,7 +660,7 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter { } // retrieve the context for the tx w/ txBytes and other memoized values. -func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context { +func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context { app.mu.Lock() defer app.mu.Unlock() @@ -666,7 +669,8 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context { panic(fmt.Sprintf("state is nil for mode %v", mode)) } ctx := modeState.Context(). - WithTxBytes(txBytes) + WithTxBytes(txBytes). + WithTxIndex(txIndex) // WithVoteInfos(app.voteInfos) // TODO: identify if this is needed ctx = ctx.WithIsSigverifyTx(app.sigverifyTx) @@ -746,7 +750,11 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock, return resp, nil } -func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult { +func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult { + return app.deliverTxWithMultiStore(tx, txIndex, nil) +} + +func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult { gInfo := sdk.GasInfo{} resultStr := "successful" @@ -759,7 +767,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult { telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted") }() - gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx) + gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, txIndex, txMultiStore) if err != nil { resultStr = "failed" resp = sdkerrors.ResponseExecTxResultWithEvents( @@ -817,12 +825,19 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) { // returned if the tx does not run out of gas and if all the messages are valid // and execute successfully. An error is returned otherwise. func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) { + return app.runTxWithMultiStore(mode, txBytes, -1, nil) +} + +func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) { // NOTE: GasWanted should be returned by the AnteHandler. GasUsed is // determined by the GasMeter. We need access to the context to get the gas // meter, so we initialize upfront. var gasWanted uint64 - ctx := app.getContextForTx(mode, txBytes) + ctx := app.getContextForTx(mode, txBytes, txIndex) + if txMultiStore != nil { + ctx = ctx.WithMultiStore(txMultiStore) + } ms := ctx.MultiStore() // only run the tx if there is block gas remaining @@ -996,6 +1011,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me break } + ctx = ctx.WithMsgIndex(i) + handler := app.msgServiceRouter.Handler(msg) if handler == nil { return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg) diff --git a/baseapp/genesis.go b/baseapp/genesis.go index 4662d1187b4a..2e5455adc3f5 100644 --- a/baseapp/genesis.go +++ b/baseapp/genesis.go @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil) // ExecuteGenesisTx implements genesis.GenesisState from // cosmossdk.io/core/genesis to set initial state in genesis func (ba *BaseApp) ExecuteGenesisTx(tx []byte) error { - res := ba.deliverTx(tx) + res := ba.deliverTx(tx, -1) if res.Code != types.CodeTypeOK { return errors.New(res.Log) diff --git a/baseapp/options.go b/baseapp/options.go index 372c2411cf82..d74439b64bf9 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -119,6 +119,11 @@ func SetOptimisticExecution(opts ...func(*oe.OptimisticExecution)) func(*BaseApp } } +// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution. +func SetTxExecutor(executor TxExecutor) func(*BaseApp) { + return func(app *BaseApp) { app.txExecutor = executor } +} + func (app *BaseApp) SetName(name string) { if app.sealed { panic("SetName() on sealed BaseApp") @@ -395,3 +400,8 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) { func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) { app.grpcQueryRouter = grpcQueryRouter } + +// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution. +func (app *BaseApp) SetTxExecutor(executor TxExecutor) { + app.txExecutor = executor +} diff --git a/baseapp/test_helpers.go b/baseapp/test_helpers.go index 846879b9562d..690146653e49 100644 --- a/baseapp/test_helpers.go +++ b/baseapp/test_helpers.go @@ -60,9 +60,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s } func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context { - return app.getContextForTx(execModeFinalize, txBytes) + return app.getContextForTx(execModeFinalize, txBytes, -1) } func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context { - return app.getContextForTx(execModeCheck, txBytes) + return app.getContextForTx(execModeCheck, txBytes, -1) } diff --git a/baseapp/txexecutor.go b/baseapp/txexecutor.go new file mode 100644 index 000000000000..2eee8c6a5451 --- /dev/null +++ b/baseapp/txexecutor.go @@ -0,0 +1,15 @@ +package baseapp + +import ( + "context" + + storetypes "cosmossdk.io/store/types" + abci "github.com/cometbft/cometbft/abci/types" +) + +type TxExecutor func( + ctx context.Context, + blockSize int, + cms storetypes.MultiStore, + deliverTxWithMultiStore func(int, storetypes.MultiStore) *abci.ExecTxResult, +) ([]*abci.ExecTxResult, error) diff --git a/types/context.go b/types/context.go index 2c63445f8709..e1cd28a0aee7 100644 --- a/types/context.go +++ b/types/context.go @@ -62,6 +62,9 @@ type Context struct { streamingManager storetypes.StreamingManager cometInfo comet.Info headerInfo header.Info + + txIndex int + msgIndex int } // Proposed rename, not done to avoid API breakage @@ -90,6 +93,8 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager } func (c Context) CometInfo() comet.Info { return c.cometInfo } func (c Context) HeaderInfo() header.Info { return c.headerInfo } +func (c Context) TxIndex() int { return c.txIndex } +func (c Context) MsgIndex() int { return c.msgIndex } // clone the header before returning func (c Context) BlockHeader() cmtproto.Header { @@ -314,6 +319,16 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context { return c } +func (c Context) WithTxIndex(txIndex int) Context { + c.txIndex = txIndex + return c +} + +func (c Context) WithMsgIndex(msgIndex int) Context { + c.msgIndex = msgIndex + return c +} + // TODO: remove??? func (c Context) IsZero() bool { return c.ms == nil