Compare commits
2 Commits
main
...
roysc/conc
Author | SHA1 | Date | |
---|---|---|---|
1d84d12f75 | |||
6ebf8471fb |
77
builder.go
77
builder.go
@ -69,8 +69,20 @@ type accountUpdate struct {
|
|||||||
new sdtypes.AccountWrapper
|
new sdtypes.AccountWrapper
|
||||||
oldRoot common.Hash
|
oldRoot common.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
type accountUpdateMap map[string]*accountUpdate
|
type accountUpdateMap map[string]*accountUpdate
|
||||||
|
|
||||||
|
type accountUpdateLens struct {
|
||||||
|
state accountUpdateMap
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *accountUpdateLens) update(fn func(accountUpdateMap)) {
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
fn(l.state)
|
||||||
|
}
|
||||||
|
|
||||||
func appender[T any](to *[]T) func(T) error {
|
func appender[T any](to *[]T) func(T) error {
|
||||||
return func(a T) error {
|
return func(a T) error {
|
||||||
*to = append(*to, a)
|
*to = append(*to, a)
|
||||||
@ -141,8 +153,11 @@ func (sdb *builder) WriteStateDiff(
|
|||||||
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers))
|
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers))
|
||||||
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
|
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
|
||||||
|
|
||||||
|
updates := accountUpdateLens{
|
||||||
|
state: make(accountUpdateMap),
|
||||||
|
}
|
||||||
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
|
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
|
||||||
// errgroup will cancel if any group fails
|
// errgroup will cancel if any worker fails
|
||||||
g, ctx := errgroup.WithContext(context.Background())
|
g, ctx := errgroup.WithContext(context.Background())
|
||||||
for i := uint(0); i < sdb.subtrieWorkers; i++ {
|
for i := uint(0); i < sdb.subtrieWorkers; i++ {
|
||||||
func(subdiv uint) {
|
func(subdiv uint) {
|
||||||
@ -152,12 +167,35 @@ func (sdb *builder) WriteStateDiff(
|
|||||||
return sdb.processAccounts(ctx,
|
return sdb.processAccounts(ctx,
|
||||||
it, &it.SymmDiffState,
|
it, &it.SymmDiffState,
|
||||||
params.watchedAddressesLeafPaths,
|
params.watchedAddressesLeafPaths,
|
||||||
nodeSink, ipldSink, logger,
|
nodeSink, ipldSink, &updates,
|
||||||
|
logger,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
return g.Wait()
|
|
||||||
|
if err = g.Wait(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, update := range updates.state {
|
||||||
|
var storageDiff []sdtypes.StorageLeafNode
|
||||||
|
err := sdb.processStorageUpdates(
|
||||||
|
update.oldRoot, update.new.Account.Root,
|
||||||
|
appender(&storageDiff), ipldSink,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error processing incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = nodeSink(sdtypes.StateLeafNode{
|
||||||
|
AccountWrapper: update.new,
|
||||||
|
StorageDiff: storageDiff,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteStateDiff writes a statediff object to output sinks
|
// WriteStateDiff writes a statediff object to output sinks
|
||||||
@ -191,7 +229,11 @@ func (sdb *builder) WriteStateSnapshot(
|
|||||||
subiters[i] = tracker.Tracked(subiters[i])
|
subiters[i] = tracker.Tracked(subiters[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// errgroup will cancel if any group fails
|
updates := accountUpdateLens{
|
||||||
|
state: make(accountUpdateMap),
|
||||||
|
}
|
||||||
|
|
||||||
|
// errgroup will cancel if any worker fails
|
||||||
g, ctx := errgroup.WithContext(context.Background())
|
g, ctx := errgroup.WithContext(context.Background())
|
||||||
for i := range subiters {
|
for i := range subiters {
|
||||||
func(subdiv uint) {
|
func(subdiv uint) {
|
||||||
@ -200,7 +242,8 @@ func (sdb *builder) WriteStateSnapshot(
|
|||||||
return sdb.processAccounts(ctx,
|
return sdb.processAccounts(ctx,
|
||||||
subiters[subdiv], &symdiff,
|
subiters[subdiv], &symdiff,
|
||||||
params.watchedAddressesLeafPaths,
|
params.watchedAddressesLeafPaths,
|
||||||
nodeSink, ipldSink, log.DefaultLogger,
|
nodeSink, ipldSink, &updates,
|
||||||
|
log.DefaultLogger,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
}(uint(i))
|
}(uint(i))
|
||||||
@ -215,13 +258,13 @@ func (sdb *builder) processAccounts(
|
|||||||
it trie.NodeIterator, symdiff *utils.SymmDiffState,
|
it trie.NodeIterator, symdiff *utils.SymmDiffState,
|
||||||
watchedAddressesLeafPaths [][]byte,
|
watchedAddressesLeafPaths [][]byte,
|
||||||
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
|
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
|
||||||
|
updateLens *accountUpdateLens,
|
||||||
logger log.Logger,
|
logger log.Logger,
|
||||||
) error {
|
) error {
|
||||||
logger.Trace("statediff/processAccounts BEGIN")
|
logger.Trace("statediff/processAccounts BEGIN")
|
||||||
defer metrics.ReportAndUpdateDuration("statediff/processAccounts END",
|
defer metrics.ReportAndUpdateDuration("statediff/processAccounts END",
|
||||||
time.Now(), logger, metrics.IndexerMetrics.ProcessAccountsTimer)
|
time.Now(), logger, metrics.IndexerMetrics.ProcessAccountsTimer)
|
||||||
|
|
||||||
updates := make(accountUpdateMap)
|
|
||||||
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
|
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
|
||||||
var prevBlob = it.NodeBlob()
|
var prevBlob = it.NodeBlob()
|
||||||
for it.Next(true) {
|
for it.Next(true) {
|
||||||
@ -245,12 +288,14 @@ func (sdb *builder) processAccounts(
|
|||||||
copy(leafKey, it.LeafKey())
|
copy(leafKey, it.LeafKey())
|
||||||
|
|
||||||
if symdiff.CommonPath() {
|
if symdiff.CommonPath() {
|
||||||
|
updateLens.update(func(updates accountUpdateMap) {
|
||||||
// If B also contains this leaf node, this is the old state of an updated account.
|
// If B also contains this leaf node, this is the old state of an updated account.
|
||||||
if update, ok := updates[string(leafKey)]; ok {
|
if update, ok := updates[string(leafKey)]; ok {
|
||||||
update.oldRoot = account.Root
|
update.oldRoot = account.Root
|
||||||
} else {
|
} else {
|
||||||
updates[string(leafKey)] = &accountUpdate{oldRoot: account.Root}
|
updates[string(leafKey)] = &accountUpdate{oldRoot: account.Root}
|
||||||
}
|
}
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
// This node was removed, meaning the account was deleted. Emit empty
|
// This node was removed, meaning the account was deleted. Emit empty
|
||||||
// "removed" records for the state node and all storage all storage slots.
|
// "removed" records for the state node and all storage all storage slots.
|
||||||
@ -270,12 +315,14 @@ func (sdb *builder) processAccounts(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if symdiff.CommonPath() {
|
if symdiff.CommonPath() {
|
||||||
|
updateLens.update(func(updates accountUpdateMap) {
|
||||||
// If A also contains this leaf node, this is the new state of an updated account.
|
// If A also contains this leaf node, this is the new state of an updated account.
|
||||||
if update, ok := updates[string(accountW.LeafKey)]; ok {
|
if update, ok := updates[string(accountW.LeafKey)]; ok {
|
||||||
update.new = *accountW
|
update.new = *accountW
|
||||||
} else {
|
} else {
|
||||||
updates[string(accountW.LeafKey)] = &accountUpdate{new: *accountW}
|
updates[string(accountW.LeafKey)] = &accountUpdate{new: *accountW}
|
||||||
}
|
}
|
||||||
|
})
|
||||||
} else { // account was created
|
} else { // account was created
|
||||||
err := sdb.processAccountCreation(accountW, ipldSink, nodeSink)
|
err := sdb.processAccountCreation(accountW, ipldSink, nodeSink)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -318,24 +365,6 @@ func (sdb *builder) processAccounts(
|
|||||||
}
|
}
|
||||||
prevBlob = nodeVal
|
prevBlob = nodeVal
|
||||||
}
|
}
|
||||||
|
|
||||||
for key, update := range updates {
|
|
||||||
var storageDiff []sdtypes.StorageLeafNode
|
|
||||||
err := sdb.processStorageUpdates(
|
|
||||||
update.oldRoot, update.new.Account.Root,
|
|
||||||
appender(&storageDiff), ipldSink,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error processing incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = nodeSink(sdtypes.StateLeafNode{
|
|
||||||
AccountWrapper: update.new,
|
|
||||||
StorageDiff: storageDiff,
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return it.Error()
|
return it.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ package sql
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
|
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
|
||||||
@ -15,6 +16,7 @@ const copyFromCheckLimit = 100
|
|||||||
type DelayedTx struct {
|
type DelayedTx struct {
|
||||||
cache []interface{}
|
cache []interface{}
|
||||||
db Database
|
db Database
|
||||||
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
type cachedStmt struct {
|
type cachedStmt struct {
|
||||||
sql string
|
sql string
|
||||||
@ -27,6 +29,8 @@ type copyFrom struct {
|
|||||||
rows [][]interface{}
|
rows [][]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type result int64
|
||||||
|
|
||||||
func (cf *copyFrom) appendRows(rows [][]interface{}) {
|
func (cf *copyFrom) appendRows(rows [][]interface{}) {
|
||||||
cf.rows = append(cf.rows, rows...)
|
cf.rows = append(cf.rows, rows...)
|
||||||
}
|
}
|
||||||
@ -44,6 +48,8 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
|
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
|
||||||
|
tx.RLock()
|
||||||
|
defer tx.RUnlock()
|
||||||
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
|
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
|
||||||
prevCopy, ok := tx.cache[pos].(*copyFrom)
|
prevCopy, ok := tx.cache[pos].(*copyFrom)
|
||||||
if ok && prevCopy.matches(tableName, columnNames) {
|
if ok && prevCopy.matches(tableName, columnNames) {
|
||||||
@ -59,15 +65,19 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam
|
|||||||
"current", len(prevCopy.rows), "new", len(rows), "distance", distance)
|
"current", len(prevCopy.rows), "new", len(rows), "distance", distance)
|
||||||
prevCopy.appendRows(rows)
|
prevCopy.appendRows(rows)
|
||||||
} else {
|
} else {
|
||||||
|
tx.Lock()
|
||||||
tx.cache = append(tx.cache, ©From{tableName, columnNames, rows})
|
tx.cache = append(tx.cache, ©From{tableName, columnNames, rows})
|
||||||
|
tx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
|
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
|
||||||
|
tx.Lock()
|
||||||
tx.cache = append(tx.cache, cachedStmt{sql, args})
|
tx.cache = append(tx.cache, cachedStmt{sql, args})
|
||||||
return nil, nil
|
defer tx.Unlock()
|
||||||
|
return result(0), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *DelayedTx) Commit(ctx context.Context) error {
|
func (tx *DelayedTx) Commit(ctx context.Context) error {
|
||||||
@ -85,6 +95,8 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
|
|||||||
rollback(ctx, base)
|
rollback(ctx, base)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
for _, item := range tx.cache {
|
for _, item := range tx.cache {
|
||||||
switch item := item.(type) {
|
switch item := item.(type) {
|
||||||
case *copyFrom:
|
case *copyFrom:
|
||||||
@ -105,6 +117,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tx *DelayedTx) Rollback(ctx context.Context) error {
|
func (tx *DelayedTx) Rollback(ctx context.Context) error {
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
tx.cache = nil
|
tx.cache = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RowsAffected satisfies sql.Result
|
||||||
|
func (r result) RowsAffected() (int64, error) {
|
||||||
|
return int64(r), nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user