From 0ee214988544ebf9c72d19a98572dd3f32880bca Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 3 Oct 2023 18:34:13 +0800 Subject: [PATCH 1/3] Synchronize cache in DelayedTx Fixes a potential but unseen race condition. --- indexer/database/sql/lazy_tx.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/indexer/database/sql/lazy_tx.go b/indexer/database/sql/lazy_tx.go index d34d8ae..701b7cb 100644 --- a/indexer/database/sql/lazy_tx.go +++ b/indexer/database/sql/lazy_tx.go @@ -3,6 +3,7 @@ package sql import ( "context" "reflect" + "sync" "time" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics" @@ -15,6 +16,7 @@ const copyFromCheckLimit = 100 type DelayedTx struct { cache []interface{} db Database + sync.RWMutex } type cachedStmt struct { sql string @@ -27,6 +29,8 @@ type copyFrom struct { rows [][]interface{} } +type result int64 + func (cf *copyFrom) appendRows(rows [][]interface{}) { cf.rows = append(cf.rows, rows...) } @@ -44,6 +48,8 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface } func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) { + tx.RLock() + defer tx.RUnlock() for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 { prevCopy, ok := tx.cache[pos].(*copyFrom) if ok && prevCopy.matches(tableName, columnNames) { @@ -59,6 +65,8 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam "current", len(prevCopy.rows), "new", len(rows), "distance", distance) prevCopy.appendRows(rows) } else { + tx.Lock() + defer tx.Unlock() tx.cache = append(tx.cache, ©From{tableName, columnNames, rows}) } @@ -66,8 +74,10 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam } func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) { + tx.Lock() + defer tx.Unlock() tx.cache = append(tx.cache, cachedStmt{sql, args}) - return nil, nil + return result(0), nil } func (tx *DelayedTx) Commit(ctx context.Context) error { @@ -85,6 +95,8 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { rollback(ctx, base) } }() + tx.Lock() + defer tx.Unlock() for _, item := range tx.cache { switch item := item.(type) { case *copyFrom: @@ -105,6 +117,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { } func (tx *DelayedTx) Rollback(ctx context.Context) error { + tx.Lock() + defer tx.Unlock() tx.cache = nil return nil } + +// RowsAffected satisfies sql.Result +func (r result) RowsAffected() (int64, error) { + return int64(r), nil +} -- 2.45.2 From 09b066b1a308c5cb4a5d89c0853cdbbd84a8f9f6 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 28 Sep 2023 11:48:15 +0800 Subject: [PATCH 2/3] Add godoc for Indexer --- indexer/interfaces/interfaces.go | 26 +++++++++++++++++++------- main/main.go | 2 +- test_helpers/builder.go | 7 ++++--- test_helpers/indexing.go | 9 +++++---- 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/indexer/interfaces/interfaces.go b/indexer/interfaces/interfaces.go index 18dc735..e2e70d8 100644 --- a/indexer/interfaces/interfaces.go +++ b/indexer/interfaces/interfaces.go @@ -29,27 +29,39 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -// StateDiffIndexer interface required to index statediff data +// StateDiffIndexer describes the interface for indexing state data. type StateDiffIndexer interface { - DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error) - CurrentBlock() (*models.HeaderModel, error) - HasBlock(hash common.Hash, number uint64) (bool, error) + // PushBlock indexes block data except for state & storage nodes: header, uncles, transactions & + // receipts. Returns an initiated DB transaction which must be committed or rolled back. PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) + // PushHeader indexes a block header. PushHeader(batch Batch, header *types.Header, reward, td *big.Int) (string, error) + // PushStateNode indexes a state node and its storage trie. PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error + // PushIPLD indexes an IPLD node. PushIPLD(tx Batch, ipld sdtypes.IPLD) error - ReportDBMetrics(delay time.Duration, quit <-chan bool) - + // BeginTx starts a new DB transaction. BeginTx(number *big.Int, ctx context.Context) Batch + // DetectGaps returns a list of gaps in the block range, if any. + DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error) + // CurrentBlock returns the latest indexed block. + CurrentBlock() (*models.HeaderModel, error) + // HasBlock returns true if the block is indexed. + HasBlock(hash common.Hash, number uint64) (bool, error) + + // Close closes the associated output DB connection or files. + Close() error + // Methods used by WatchAddress API/functionality + LoadWatchedAddresses() ([]common.Address, error) InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error ClearWatchedAddresses() error - Close() error + ReportDBMetrics(delay time.Duration, quit <-chan bool) } // Batch required for indexing data atomically diff --git a/main/main.go b/main/main.go index 61cce90..6192c19 100644 --- a/main/main.go +++ b/main/main.go @@ -26,7 +26,7 @@ func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) { gethContext = ctx } -func InitializeNode(stack core.Node, b core.Backend) { +func InitializeNode(_ core.Node, b core.Backend) { backend := b.(restricted.Backend) networkid, err := strconv.ParseUint(gethContext.String(geth_flags.NetworkIdFlag.Name), 10, 64) diff --git a/test_helpers/builder.go b/test_helpers/builder.go index 18a9363..e641bcb 100644 --- a/test_helpers/builder.go +++ b/test_helpers/builder.go @@ -12,14 +12,15 @@ import ( "testing" "github.com/cerc-io/eth-iterator-utils/tracker" - statediff "github.com/cerc-io/plugeth-statediff" - "github.com/cerc-io/plugeth-statediff/adapt" - sdtypes "github.com/cerc-io/plugeth-statediff/types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/require" + + "github.com/cerc-io/plugeth-statediff" + "github.com/cerc-io/plugeth-statediff/adapt" + sdtypes "github.com/cerc-io/plugeth-statediff/types" ) var subtrieCounts = []uint{1, 8, 32} diff --git a/test_helpers/indexing.go b/test_helpers/indexing.go index 5c34b58..d738a76 100644 --- a/test_helpers/indexing.go +++ b/test_helpers/indexing.go @@ -5,15 +5,16 @@ import ( "fmt" "math/big" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" + "github.com/cerc-io/plugeth-statediff" "github.com/cerc-io/plugeth-statediff/adapt" "github.com/cerc-io/plugeth-statediff/indexer" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/node" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/params" ) type IndexChainParams struct { -- 2.45.2 From 801979ea2de05856c4b7a9fc4b7b2ebd82068ef9 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Wed, 10 Jul 2024 18:05:25 +0800 Subject: [PATCH 3/3] Don't run ipld-eth-server in stack and remove unused stack.yml --- scripts/run-test-stack.sh | 7 ++++--- test/stack.yml | 20 -------------------- 2 files changed, 4 insertions(+), 23 deletions(-) delete mode 100644 test/stack.yml diff --git a/scripts/run-test-stack.sh b/scripts/run-test-stack.sh index 29cb096..45b783d 100755 --- a/scripts/run-test-stack.sh +++ b/scripts/run-test-stack.sh @@ -18,14 +18,15 @@ echo CERC_REMOTE_DEBUG=false >> $CONFIG_DIR/stack.env if [[ -z $SKIP_BUILD ]]; then + # Assume the tested image has been built separately. $laconic_so setup-repositories \ - --exclude git.vdb.to/cerc-io/plugeth-statediff - # Assume the tested image has been built separately + --exclude git.vdb.to/cerc-io/plugeth-statediff,git.vdb.to/cerc-io/ipld-eth-server $laconic_so build-containers \ - --exclude cerc/plugeth-statediff + --exclude cerc/plugeth-statediff,cerc/ipld-eth-server fi if ! $laconic_so deploy \ + --exclude ipld-eth-server \ --env-file $CONFIG_DIR/stack.env \ --cluster test up then diff --git a/test/stack.yml b/test/stack.yml deleted file mode 100644 index e52610d..0000000 --- a/test/stack.yml +++ /dev/null @@ -1,20 +0,0 @@ -version: "1.2" -name: fixturenet-plugeth-tx -description: "Plugeth Ethereum Fixturenet for testing plugeth-statediff" -repos: - - git.vdb.to/cerc-io/plugeth@v1.13.14-cerc-2 - - git.vdb.to/cerc-io/plugeth-statediff - - git.vdb.to/cerc-io/lighthouse - - git.vdb.to/cerc-io/ipld-eth-db@v5.4.0-alpha -containers: - - cerc/plugeth-statediff - - cerc/plugeth - - cerc/fixturenet-eth-genesis - - cerc/fixturenet-plugeth-plugeth - - cerc/lighthouse - - cerc/lighthouse-cli - - cerc/fixturenet-eth-lighthouse - - cerc/ipld-eth-db -pods: - - fixturenet-plugeth - - ipld-eth-db -- 2.45.2