Compare commits

...

2 Commits

Author SHA1 Message Date
1d84d12f75 handle updated accounts in shared map
there's a possibly (though yet unseen) that an updated account could be moved outside the domain of
a bounded iterater, in which case it would not see the update after traversal. previously this would
have caused an error, but this should prevent it from happening.
2023-10-03 19:03:54 +08:00
6ebf8471fb guard delayed tx cache 2023-10-03 19:03:54 +08:00
2 changed files with 85 additions and 37 deletions

View File

@ -69,8 +69,20 @@ type accountUpdate struct {
new sdtypes.AccountWrapper new sdtypes.AccountWrapper
oldRoot common.Hash oldRoot common.Hash
} }
type accountUpdateMap map[string]*accountUpdate type accountUpdateMap map[string]*accountUpdate
type accountUpdateLens struct {
state accountUpdateMap
sync.Mutex
}
func (l *accountUpdateLens) update(fn func(accountUpdateMap)) {
l.Lock()
defer l.Unlock()
fn(l.state)
}
func appender[T any](to *[]T) func(T) error { func appender[T any](to *[]T) func(T) error {
return func(a T) error { return func(a T) error {
*to = append(*to, a) *to = append(*to, a)
@ -141,8 +153,11 @@ func (sdb *builder) WriteStateDiff(
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers)) subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers))
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers)) subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
updates := accountUpdateLens{
state: make(accountUpdateMap),
}
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber) logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
// errgroup will cancel if any group fails // errgroup will cancel if any worker fails
g, ctx := errgroup.WithContext(context.Background()) g, ctx := errgroup.WithContext(context.Background())
for i := uint(0); i < sdb.subtrieWorkers; i++ { for i := uint(0); i < sdb.subtrieWorkers; i++ {
func(subdiv uint) { func(subdiv uint) {
@ -152,12 +167,35 @@ func (sdb *builder) WriteStateDiff(
return sdb.processAccounts(ctx, return sdb.processAccounts(ctx,
it, &it.SymmDiffState, it, &it.SymmDiffState,
params.watchedAddressesLeafPaths, params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger, nodeSink, ipldSink, &updates,
logger,
) )
}) })
}(i) }(i)
} }
return g.Wait()
if err = g.Wait(); err != nil {
return err
}
for key, update := range updates.state {
var storageDiff []sdtypes.StorageLeafNode
err := sdb.processStorageUpdates(
update.oldRoot, update.new.Account.Root,
appender(&storageDiff), ipldSink,
)
if err != nil {
return fmt.Errorf("error processing incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
}
if err = nodeSink(sdtypes.StateLeafNode{
AccountWrapper: update.new,
StorageDiff: storageDiff,
}); err != nil {
return err
}
}
return nil
} }
// WriteStateDiff writes a statediff object to output sinks // WriteStateDiff writes a statediff object to output sinks
@ -191,7 +229,11 @@ func (sdb *builder) WriteStateSnapshot(
subiters[i] = tracker.Tracked(subiters[i]) subiters[i] = tracker.Tracked(subiters[i])
} }
} }
// errgroup will cancel if any group fails updates := accountUpdateLens{
state: make(accountUpdateMap),
}
// errgroup will cancel if any worker fails
g, ctx := errgroup.WithContext(context.Background()) g, ctx := errgroup.WithContext(context.Background())
for i := range subiters { for i := range subiters {
func(subdiv uint) { func(subdiv uint) {
@ -200,7 +242,8 @@ func (sdb *builder) WriteStateSnapshot(
return sdb.processAccounts(ctx, return sdb.processAccounts(ctx,
subiters[subdiv], &symdiff, subiters[subdiv], &symdiff,
params.watchedAddressesLeafPaths, params.watchedAddressesLeafPaths,
nodeSink, ipldSink, log.DefaultLogger, nodeSink, ipldSink, &updates,
log.DefaultLogger,
) )
}) })
}(uint(i)) }(uint(i))
@ -215,13 +258,13 @@ func (sdb *builder) processAccounts(
it trie.NodeIterator, symdiff *utils.SymmDiffState, it trie.NodeIterator, symdiff *utils.SymmDiffState,
watchedAddressesLeafPaths [][]byte, watchedAddressesLeafPaths [][]byte,
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
updateLens *accountUpdateLens,
logger log.Logger, logger log.Logger,
) error { ) error {
logger.Trace("statediff/processAccounts BEGIN") logger.Trace("statediff/processAccounts BEGIN")
defer metrics.ReportAndUpdateDuration("statediff/processAccounts END", defer metrics.ReportAndUpdateDuration("statediff/processAccounts END",
time.Now(), logger, metrics.IndexerMetrics.ProcessAccountsTimer) time.Now(), logger, metrics.IndexerMetrics.ProcessAccountsTimer)
updates := make(accountUpdateMap)
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob. // Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
var prevBlob = it.NodeBlob() var prevBlob = it.NodeBlob()
for it.Next(true) { for it.Next(true) {
@ -245,12 +288,14 @@ func (sdb *builder) processAccounts(
copy(leafKey, it.LeafKey()) copy(leafKey, it.LeafKey())
if symdiff.CommonPath() { if symdiff.CommonPath() {
// If B also contains this leaf node, this is the old state of an updated account. updateLens.update(func(updates accountUpdateMap) {
if update, ok := updates[string(leafKey)]; ok { // If B also contains this leaf node, this is the old state of an updated account.
update.oldRoot = account.Root if update, ok := updates[string(leafKey)]; ok {
} else { update.oldRoot = account.Root
updates[string(leafKey)] = &accountUpdate{oldRoot: account.Root} } else {
} updates[string(leafKey)] = &accountUpdate{oldRoot: account.Root}
}
})
} else { } else {
// This node was removed, meaning the account was deleted. Emit empty // This node was removed, meaning the account was deleted. Emit empty
// "removed" records for the state node and all storage all storage slots. // "removed" records for the state node and all storage all storage slots.
@ -270,12 +315,14 @@ func (sdb *builder) processAccounts(
} }
if symdiff.CommonPath() { if symdiff.CommonPath() {
// If A also contains this leaf node, this is the new state of an updated account. updateLens.update(func(updates accountUpdateMap) {
if update, ok := updates[string(accountW.LeafKey)]; ok { // If A also contains this leaf node, this is the new state of an updated account.
update.new = *accountW if update, ok := updates[string(accountW.LeafKey)]; ok {
} else { update.new = *accountW
updates[string(accountW.LeafKey)] = &accountUpdate{new: *accountW} } else {
} updates[string(accountW.LeafKey)] = &accountUpdate{new: *accountW}
}
})
} else { // account was created } else { // account was created
err := sdb.processAccountCreation(accountW, ipldSink, nodeSink) err := sdb.processAccountCreation(accountW, ipldSink, nodeSink)
if err != nil { if err != nil {
@ -318,24 +365,6 @@ func (sdb *builder) processAccounts(
} }
prevBlob = nodeVal prevBlob = nodeVal
} }
for key, update := range updates {
var storageDiff []sdtypes.StorageLeafNode
err := sdb.processStorageUpdates(
update.oldRoot, update.new.Account.Root,
appender(&storageDiff), ipldSink,
)
if err != nil {
return fmt.Errorf("error processing incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
}
if err = nodeSink(sdtypes.StateLeafNode{
AccountWrapper: update.new,
StorageDiff: storageDiff,
}); err != nil {
return err
}
}
return it.Error() return it.Error()
} }

View File

@ -3,6 +3,7 @@ package sql
import ( import (
"context" "context"
"reflect" "reflect"
"sync"
"time" "time"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
@ -15,6 +16,7 @@ const copyFromCheckLimit = 100
type DelayedTx struct { type DelayedTx struct {
cache []interface{} cache []interface{}
db Database db Database
sync.RWMutex
} }
type cachedStmt struct { type cachedStmt struct {
sql string sql string
@ -27,6 +29,8 @@ type copyFrom struct {
rows [][]interface{} rows [][]interface{}
} }
type result int64
func (cf *copyFrom) appendRows(rows [][]interface{}) { func (cf *copyFrom) appendRows(rows [][]interface{}) {
cf.rows = append(cf.rows, rows...) cf.rows = append(cf.rows, rows...)
} }
@ -44,6 +48,8 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
} }
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) { func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
tx.RLock()
defer tx.RUnlock()
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 { for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
prevCopy, ok := tx.cache[pos].(*copyFrom) prevCopy, ok := tx.cache[pos].(*copyFrom)
if ok && prevCopy.matches(tableName, columnNames) { if ok && prevCopy.matches(tableName, columnNames) {
@ -59,15 +65,19 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam
"current", len(prevCopy.rows), "new", len(rows), "distance", distance) "current", len(prevCopy.rows), "new", len(rows), "distance", distance)
prevCopy.appendRows(rows) prevCopy.appendRows(rows)
} else { } else {
tx.Lock()
tx.cache = append(tx.cache, &copyFrom{tableName, columnNames, rows}) tx.cache = append(tx.cache, &copyFrom{tableName, columnNames, rows})
tx.Unlock()
} }
return 0, nil return 0, nil
} }
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) { func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
tx.Lock()
tx.cache = append(tx.cache, cachedStmt{sql, args}) tx.cache = append(tx.cache, cachedStmt{sql, args})
return nil, nil defer tx.Unlock()
return result(0), nil
} }
func (tx *DelayedTx) Commit(ctx context.Context) error { func (tx *DelayedTx) Commit(ctx context.Context) error {
@ -85,6 +95,8 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
rollback(ctx, base) rollback(ctx, base)
} }
}() }()
tx.Lock()
defer tx.Unlock()
for _, item := range tx.cache { for _, item := range tx.cache {
switch item := item.(type) { switch item := item.(type) {
case *copyFrom: case *copyFrom:
@ -105,6 +117,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
} }
func (tx *DelayedTx) Rollback(ctx context.Context) error { func (tx *DelayedTx) Rollback(ctx context.Context) error {
tx.Lock()
defer tx.Unlock()
tx.cache = nil tx.cache = nil
return nil return nil
} }
// RowsAffected satisfies sql.Result
func (r result) RowsAffected() (int64, error) {
return int64(r), nil
}