fix(unorderedtx): issues reported in audit (#21467)
This commit is contained in:
parent
071aa508eb
commit
e9eaefa379
14
UPGRADING.md
14
UPGRADING.md
@ -277,6 +277,20 @@ If you are still using the legacy wiring, you must enable unordered transactions
|
||||
}
|
||||
```
|
||||
|
||||
* Create or update the App's `Preblocker()` method to call the unordered tx
|
||||
manager's `OnNewBlock()` method.
|
||||
|
||||
```go
|
||||
...
|
||||
app.SetPreblocker(app.PreBlocker)
|
||||
...
|
||||
|
||||
func (app *SimApp) PreBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) (*sdk.ResponsePreBlock, error) {
|
||||
app.UnorderedTxManager.OnNewBlock(ctx.BlockTime())
|
||||
return app.ModuleManager.PreBlock(ctx, req)
|
||||
}
|
||||
```
|
||||
|
||||
* Create or update the App's `Close()` method to close the unordered tx manager.
|
||||
Note, this is critical as it ensures the manager's state is written to file
|
||||
such that when the node restarts, it can recover the state to provide replay
|
||||
|
||||
@ -292,36 +292,42 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
|
||||
invalidTxs []sdk.Tx // invalid txs to be removed out of the loop to avoid dead lock
|
||||
)
|
||||
h.mempool.SelectBy(ctx, req.Txs, func(memTx sdk.Tx) bool {
|
||||
signerData, err := h.signerExtAdapter.GetSigners(memTx)
|
||||
if err != nil {
|
||||
// propagate the error to the caller
|
||||
resError = err
|
||||
return false
|
||||
}
|
||||
|
||||
// If the signers aren't in selectedTxsSignersSeqs then we haven't seen them before
|
||||
// so we add them and continue given that we don't need to check the sequence.
|
||||
shouldAdd := true
|
||||
unorderedTx, ok := memTx.(sdk.TxWithUnordered)
|
||||
isUnordered := ok && unorderedTx.GetUnordered()
|
||||
txSignersSeqs := make(map[string]uint64)
|
||||
for _, signer := range signerData {
|
||||
signerKey := string(signer.Signer)
|
||||
seq, ok := selectedTxsSignersSeqs[signerKey]
|
||||
if !ok {
|
||||
txSignersSeqs[signerKey] = signer.Sequence
|
||||
continue
|
||||
|
||||
// if the tx is unordered, we don't need to check the sequence, we just add it
|
||||
if !isUnordered {
|
||||
signerData, err := h.signerExtAdapter.GetSigners(memTx)
|
||||
if err != nil {
|
||||
// propagate the error to the caller
|
||||
resError = err
|
||||
return false
|
||||
}
|
||||
|
||||
// If we have seen this signer before in this block, we must make
|
||||
// sure that the current sequence is seq+1; otherwise is invalid
|
||||
// and we skip it.
|
||||
if seq+1 != signer.Sequence {
|
||||
shouldAdd = false
|
||||
break
|
||||
// If the signers aren't in selectedTxsSignersSeqs then we haven't seen them before
|
||||
// so we add them and continue given that we don't need to check the sequence.
|
||||
shouldAdd := true
|
||||
for _, signer := range signerData {
|
||||
signerKey := string(signer.Signer)
|
||||
seq, ok := selectedTxsSignersSeqs[signerKey]
|
||||
if !ok {
|
||||
txSignersSeqs[signerKey] = signer.Sequence
|
||||
continue
|
||||
}
|
||||
|
||||
// If we have seen this signer before in this block, we must make
|
||||
// sure that the current sequence is seq+1; otherwise is invalid
|
||||
// and we skip it.
|
||||
if seq+1 != signer.Sequence {
|
||||
shouldAdd = false
|
||||
break
|
||||
}
|
||||
txSignersSeqs[signerKey] = signer.Sequence
|
||||
}
|
||||
if !shouldAdd {
|
||||
return true
|
||||
}
|
||||
txSignersSeqs[signerKey] = signer.Sequence
|
||||
}
|
||||
if !shouldAdd {
|
||||
return true
|
||||
}
|
||||
|
||||
// NOTE: Since transaction verification was already executed in CheckTx,
|
||||
@ -338,18 +344,21 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
|
||||
}
|
||||
|
||||
txsLen := len(h.txSelector.SelectedTxs(ctx))
|
||||
for sender, seq := range txSignersSeqs {
|
||||
// If txsLen != selectedTxsNums is true, it means that we've
|
||||
// added a new tx to the selected txs, so we need to update
|
||||
// the sequence of the sender.
|
||||
if txsLen != selectedTxsNums {
|
||||
selectedTxsSignersSeqs[sender] = seq
|
||||
} else if _, ok := selectedTxsSignersSeqs[sender]; !ok {
|
||||
// The transaction hasn't been added but it passed the
|
||||
// verification, so we know that the sequence is correct.
|
||||
// So we set this sender's sequence to seq-1, in order
|
||||
// to avoid unnecessary calls to PrepareProposalVerifyTx.
|
||||
selectedTxsSignersSeqs[sender] = seq - 1
|
||||
// If the tx is unordered, we don't need to update the sender sequence.
|
||||
if !isUnordered {
|
||||
for sender, seq := range txSignersSeqs {
|
||||
// If txsLen != selectedTxsNums is true, it means that we've
|
||||
// added a new tx to the selected txs, so we need to update
|
||||
// the sequence of the sender.
|
||||
if txsLen != selectedTxsNums {
|
||||
selectedTxsSignersSeqs[sender] = seq
|
||||
} else if _, ok := selectedTxsSignersSeqs[sender]; !ok {
|
||||
// The transaction hasn't been added but it passed the
|
||||
// verification, so we know that the sequence is correct.
|
||||
// So we set this sender's sequence to seq-1, in order
|
||||
// to avoid unnecessary calls to PrepareProposalVerifyTx.
|
||||
selectedTxsSignersSeqs[sender] = seq - 1
|
||||
}
|
||||
}
|
||||
}
|
||||
selectedTxsNums = txsLen
|
||||
|
||||
@ -173,6 +173,9 @@ func (a *App) Close() error {
|
||||
|
||||
// PreBlocker application updates every pre block
|
||||
func (a *App) PreBlocker(ctx sdk.Context, _ *abci.FinalizeBlockRequest) error {
|
||||
if a.UnorderedTxManager != nil {
|
||||
a.UnorderedTxManager.OnNewBlock(ctx.BlockTime())
|
||||
}
|
||||
return a.ModuleManager.PreBlock(ctx)
|
||||
}
|
||||
|
||||
|
||||
@ -691,6 +691,7 @@ func (app *SimApp) Name() string { return app.BaseApp.Name() }
|
||||
|
||||
// PreBlocker application updates every pre block
|
||||
func (app *SimApp) PreBlocker(ctx sdk.Context, _ *abci.FinalizeBlockRequest) error {
|
||||
app.UnorderedTxManager.OnNewBlock(ctx.BlockTime())
|
||||
return app.ModuleManager.PreBlock(ctx)
|
||||
}
|
||||
|
||||
|
||||
@ -223,6 +223,16 @@ func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error
|
||||
sender := sig.Signer.String()
|
||||
priority := mp.cfg.TxPriority.GetTxPriority(ctx, tx)
|
||||
nonce := sig.Sequence
|
||||
|
||||
// if it's an unordered tx, we use the gas instead of the nonce
|
||||
if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() {
|
||||
gasLimit, err := unordered.GetGasLimit()
|
||||
nonce = gasLimit
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
key := txMeta[C]{nonce: nonce, priority: priority, sender: sender}
|
||||
|
||||
senderIndex, ok := mp.senderIndices[sender]
|
||||
@ -459,6 +469,15 @@ func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error {
|
||||
sender := sig.Signer.String()
|
||||
nonce := sig.Sequence
|
||||
|
||||
// if it's an unordered tx, we use the gas instead of the nonce
|
||||
if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() {
|
||||
gasLimit, err := unordered.GetGasLimit()
|
||||
nonce = gasLimit
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
scoreKey := txMeta[C]{nonce: nonce, sender: sender}
|
||||
score, ok := mp.scores[scoreKey]
|
||||
if !ok {
|
||||
|
||||
@ -145,6 +145,15 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
|
||||
snm.senders[sender] = senderTxs
|
||||
}
|
||||
|
||||
// if it's an unordered tx, we use the gas instead of the nonce
|
||||
if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() {
|
||||
gasLimit, err := unordered.GetGasLimit()
|
||||
nonce = gasLimit
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
senderTxs.Set(nonce, tx)
|
||||
|
||||
key := txKey{nonce: nonce, address: sender}
|
||||
@ -227,6 +236,15 @@ func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error {
|
||||
sender := sdk.AccAddress(sig.PubKey.Address()).String()
|
||||
nonce := sig.Sequence
|
||||
|
||||
// if it's an unordered tx, we use the gas instead of the nonce
|
||||
if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() {
|
||||
gasLimit, err := unordered.GetGasLimit()
|
||||
nonce = gasLimit
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
senderTxs, found := snm.senders[sender]
|
||||
if !found {
|
||||
return ErrTxNotFound
|
||||
|
||||
@ -57,7 +57,9 @@ type Manager struct {
|
||||
func NewManager(dataDir string) *Manager {
|
||||
path := filepath.Join(dataDir, dirName)
|
||||
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
||||
_ = os.Mkdir(path, os.ModePerm)
|
||||
if err = os.MkdirAll(path, os.ModePerm); err != nil {
|
||||
panic(fmt.Errorf("failed to create unordered txs directory: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
m := &Manager{
|
||||
@ -157,18 +159,14 @@ func (m *Manager) OnNewBlock(blockTime time.Time) {
|
||||
m.blockCh <- blockTime
|
||||
}
|
||||
|
||||
func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) error) error {
|
||||
func (m *Manager) exportSnapshot(_ uint64, snapshotWriter func([]byte) error) error {
|
||||
var buf bytes.Buffer
|
||||
w := bufio.NewWriter(&buf)
|
||||
|
||||
keys := slices.SortedFunc(maps.Keys(m.txHashes), func(i, j TxHash) int { return bytes.Compare(i[:], j[:]) })
|
||||
timestamp := time.Unix(int64(height), 0)
|
||||
for _, txHash := range keys {
|
||||
timeoutTime := m.txHashes[txHash]
|
||||
if timestamp.After(timeoutTime) {
|
||||
// skip expired txs that have yet to be purged
|
||||
continue
|
||||
}
|
||||
|
||||
// right now we dont have access block time at this flow, so we would just include the expired txs
|
||||
// and let it be purge during purge loop
|
||||
chunk := unorderedTxToBytes(txHash, uint64(timeoutTime.Unix()))
|
||||
@ -185,8 +183,8 @@ func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) erro
|
||||
return snapshotWriter(buf.Bytes())
|
||||
}
|
||||
|
||||
// flushToFile writes all unexpired unordered transactions along with their TTL
|
||||
// to file, overwriting the existing file if it exists.
|
||||
// flushToFile writes all unordered transactions (including expired if not pruned yet)
|
||||
// along with their TTL to file, overwriting the existing file if it exists.
|
||||
func (m *Manager) flushToFile() error {
|
||||
f, err := os.Create(filepath.Join(m.dataDir, dirName, fileName))
|
||||
if err != nil {
|
||||
@ -251,6 +249,8 @@ func (m *Manager) purgeLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
// batchReceive receives block time from the channel until the context is done
|
||||
// or the channel is closed.
|
||||
func (m *Manager) batchReceive() (time.Time, bool) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@ -81,8 +81,9 @@ func (s *Snapshotter) restore(height uint64, payloadReader snapshot.ExtensionPay
|
||||
|
||||
timestamp := binary.BigEndian.Uint64(payload[i+txHashSize : i+chunkSize])
|
||||
|
||||
// purge any expired txs
|
||||
if timestamp != 0 && timestamp > height {
|
||||
// add all txs, we don't care at this point if they are expired,
|
||||
// we'll let the purge loop handle that
|
||||
if timestamp != 0 {
|
||||
s.m.Add(txHash, time.Unix(int64(timestamp), 0))
|
||||
}
|
||||
|
||||
|
||||
@ -39,12 +39,20 @@ func TestSnapshotter(t *testing.T) {
|
||||
err = s.RestoreExtension(50, 2, pr)
|
||||
require.Error(t, err)
|
||||
|
||||
// restore with timestamp > timeout time which should result in no unordered txs synced
|
||||
// restore with timestamp > timeout time which should result in all unordered txs synced,
|
||||
// even the ones that have timed out.
|
||||
txm2 := unorderedtx.NewManager(dataDir)
|
||||
s2 := unorderedtx.NewSnapshotter(txm2)
|
||||
err = s2.RestoreExtension(uint64(currentTime.Add(time.Second*200).Unix()), unorderedtx.SnapshotFormat, pr)
|
||||
err = s2.RestoreExtension(1, unorderedtx.SnapshotFormat, pr)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, txm2.Size())
|
||||
require.Equal(t, 100, txm2.Size())
|
||||
|
||||
// start the manager and wait a bit for the background purge loop to run
|
||||
txm2.Start()
|
||||
time.Sleep(time.Millisecond * 5)
|
||||
txm2.OnNewBlock(currentTime.Add(time.Second * 200))
|
||||
time.Sleep(time.Second * 5) // the loop runs every 5 seconds, so we need to wait for that
|
||||
require.Equal(t, 0, txm2.Size())
|
||||
|
||||
// restore with timestamp < timeout time which should result in all unordered txs synced
|
||||
txm3 := unorderedtx.NewManager(dataDir)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user