161 lines
4.7 KiB
Go
161 lines
4.7 KiB
Go
package oe
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
|
|
abci "github.com/cometbft/cometbft/abci/types"
|
|
|
|
"cosmossdk.io/log"
|
|
)
|
|
|
|
// FinalizeBlockFunc is the function that is called by the OE to finalize the
|
|
// block. It is the same as the one in the ABCI app.
|
|
type FinalizeBlockFunc func(context.Context, *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error)
|
|
|
|
// OptimisticExecution is a struct that contains the OE context. It is used to
|
|
// run the FinalizeBlock function in a goroutine, and to abort it if needed.
|
|
type OptimisticExecution struct {
|
|
finalizeBlockFunc FinalizeBlockFunc // ABCI FinalizeBlock function with a context
|
|
logger log.Logger
|
|
|
|
mtx sync.Mutex
|
|
stopCh chan struct{}
|
|
request *abci.RequestFinalizeBlock
|
|
response *abci.ResponseFinalizeBlock
|
|
err error
|
|
cancelFunc func() // cancel function for the context
|
|
initialized bool // A boolean value indicating whether the struct has been initialized
|
|
|
|
// debugging/testing options
|
|
abortRate int // number from 0 to 100 that determines the percentage of OE that should be aborted
|
|
}
|
|
|
|
// NewOptimisticExecution initializes the Optimistic Execution context but does not start it.
|
|
func NewOptimisticExecution(logger log.Logger, fn FinalizeBlockFunc, opts ...func(*OptimisticExecution)) *OptimisticExecution {
|
|
logger = logger.With(log.ModuleKey, "oe")
|
|
oe := &OptimisticExecution{logger: logger, finalizeBlockFunc: fn}
|
|
for _, opt := range opts {
|
|
opt(oe)
|
|
}
|
|
return oe
|
|
}
|
|
|
|
// WithAbortRate sets the abort rate for the OE. The abort rate is a number from
|
|
// 0 to 100 that determines the percentage of OE that should be aborted.
|
|
// This is for testing purposes only and must not be used in production.
|
|
func WithAbortRate(rate int) func(*OptimisticExecution) {
|
|
return func(oe *OptimisticExecution) {
|
|
oe.abortRate = rate
|
|
}
|
|
}
|
|
|
|
// Reset resets the OE context. Must be called whenever we want to invalidate
|
|
// the current OE.
|
|
func (oe *OptimisticExecution) Reset() {
|
|
oe.mtx.Lock()
|
|
defer oe.mtx.Unlock()
|
|
oe.request = nil
|
|
oe.response = nil
|
|
oe.err = nil
|
|
oe.initialized = false
|
|
}
|
|
|
|
func (oe *OptimisticExecution) Enabled() bool {
|
|
return oe != nil
|
|
}
|
|
|
|
// Initialized returns true if the OE was initialized, meaning that it contains
|
|
// a request and it was run or it is running.
|
|
func (oe *OptimisticExecution) Initialized() bool {
|
|
if oe == nil {
|
|
return false
|
|
}
|
|
oe.mtx.Lock()
|
|
defer oe.mtx.Unlock()
|
|
|
|
return oe.initialized
|
|
}
|
|
|
|
// Execute initializes the OE and starts it in a goroutine.
|
|
func (oe *OptimisticExecution) Execute(req *abci.RequestProcessProposal) {
|
|
oe.mtx.Lock()
|
|
defer oe.mtx.Unlock()
|
|
|
|
oe.stopCh = make(chan struct{})
|
|
oe.request = &abci.RequestFinalizeBlock{
|
|
Txs: req.Txs,
|
|
DecidedLastCommit: req.ProposedLastCommit,
|
|
Misbehavior: req.Misbehavior,
|
|
Hash: req.Hash,
|
|
Height: req.Height,
|
|
Time: req.Time,
|
|
NextValidatorsHash: req.NextValidatorsHash,
|
|
ProposerAddress: req.ProposerAddress,
|
|
}
|
|
|
|
oe.logger.Debug("OE started", "height", req.Height, "hash", hex.EncodeToString(req.Hash), "time", req.Time.String())
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
oe.cancelFunc = cancel
|
|
oe.initialized = true
|
|
|
|
go func() {
|
|
start := time.Now()
|
|
resp, err := oe.finalizeBlockFunc(ctx, oe.request)
|
|
|
|
oe.mtx.Lock()
|
|
|
|
executionTime := time.Since(start)
|
|
oe.logger.Debug("OE finished", "duration", executionTime.String(), "height", oe.request.Height, "hash", hex.EncodeToString(oe.request.Hash))
|
|
oe.response, oe.err = resp, err
|
|
|
|
close(oe.stopCh)
|
|
oe.mtx.Unlock()
|
|
}()
|
|
}
|
|
|
|
// AbortIfNeeded aborts the OE if the request hash is not the same as the one in
|
|
// the running OE. Returns true if the OE was aborted.
|
|
func (oe *OptimisticExecution) AbortIfNeeded(reqHash []byte) bool {
|
|
if oe == nil {
|
|
return false
|
|
}
|
|
|
|
oe.mtx.Lock()
|
|
defer oe.mtx.Unlock()
|
|
|
|
if !bytes.Equal(oe.request.Hash, reqHash) {
|
|
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
|
|
oe.cancelFunc()
|
|
return true
|
|
} else if oe.abortRate > 0 && rand.Intn(100) < oe.abortRate {
|
|
// this is for test purposes only, we can emulate a certain percentage of
|
|
// OE needed to be aborted.
|
|
oe.cancelFunc()
|
|
oe.logger.Error("OE aborted due to test abort rate")
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Abort aborts the OE unconditionally and waits for it to finish.
|
|
func (oe *OptimisticExecution) Abort() {
|
|
if oe == nil || oe.cancelFunc == nil {
|
|
return
|
|
}
|
|
|
|
oe.cancelFunc()
|
|
<-oe.stopCh
|
|
}
|
|
|
|
// WaitResult waits for the OE to finish and returns the result.
|
|
func (oe *OptimisticExecution) WaitResult() (*abci.ResponseFinalizeBlock, error) {
|
|
<-oe.stopCh
|
|
return oe.response, oe.err
|
|
}
|