WIP: Synchronize DelayedTx cache, clean up #31

Draft
roysc wants to merge 3 commits from cleanup into main
7 changed files with 53 additions and 39 deletions

View File

@ -3,6 +3,7 @@ package sql
import ( import (
"context" "context"
"reflect" "reflect"
"sync"
"time" "time"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
@ -15,6 +16,7 @@ const copyFromCheckLimit = 100
type DelayedTx struct { type DelayedTx struct {
cache []interface{} cache []interface{}
db Database db Database
sync.RWMutex
} }
type cachedStmt struct { type cachedStmt struct {
sql string sql string
@ -27,6 +29,8 @@ type copyFrom struct {
rows [][]interface{} rows [][]interface{}
} }
type result int64
func (cf *copyFrom) appendRows(rows [][]interface{}) { func (cf *copyFrom) appendRows(rows [][]interface{}) {
cf.rows = append(cf.rows, rows...) cf.rows = append(cf.rows, rows...)
} }
@ -44,6 +48,8 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
} }
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) { func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
tx.RLock()
defer tx.RUnlock()
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 { for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
prevCopy, ok := tx.cache[pos].(*copyFrom) prevCopy, ok := tx.cache[pos].(*copyFrom)
if ok && prevCopy.matches(tableName, columnNames) { if ok && prevCopy.matches(tableName, columnNames) {
@ -59,6 +65,8 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam
"current", len(prevCopy.rows), "new", len(rows), "distance", distance) "current", len(prevCopy.rows), "new", len(rows), "distance", distance)
prevCopy.appendRows(rows) prevCopy.appendRows(rows)
} else { } else {
tx.Lock()
defer tx.Unlock()
tx.cache = append(tx.cache, &copyFrom{tableName, columnNames, rows}) tx.cache = append(tx.cache, &copyFrom{tableName, columnNames, rows})
} }
@ -66,8 +74,10 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam
} }
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) { func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
tx.Lock()
defer tx.Unlock()
tx.cache = append(tx.cache, cachedStmt{sql, args}) tx.cache = append(tx.cache, cachedStmt{sql, args})
return nil, nil return result(0), nil
} }
func (tx *DelayedTx) Commit(ctx context.Context) error { func (tx *DelayedTx) Commit(ctx context.Context) error {
@ -85,6 +95,8 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
rollback(ctx, base) rollback(ctx, base)
} }
}() }()
tx.Lock()
defer tx.Unlock()
for _, item := range tx.cache { for _, item := range tx.cache {
switch item := item.(type) { switch item := item.(type) {
case *copyFrom: case *copyFrom:
@ -105,6 +117,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
} }
func (tx *DelayedTx) Rollback(ctx context.Context) error { func (tx *DelayedTx) Rollback(ctx context.Context) error {
tx.Lock()
defer tx.Unlock()
tx.cache = nil tx.cache = nil
return nil return nil
} }
// RowsAffected satisfies sql.Result
func (r result) RowsAffected() (int64, error) {
return int64(r), nil
}

View File

@ -29,27 +29,39 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
// StateDiffIndexer interface required to index statediff data // StateDiffIndexer describes the interface for indexing state data.
type StateDiffIndexer interface { type StateDiffIndexer interface {
DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error) // PushBlock indexes block data except for state & storage nodes: header, uncles, transactions &
CurrentBlock() (*models.HeaderModel, error) // receipts. Returns an initiated DB transaction which must be committed or rolled back.
HasBlock(hash common.Hash, number uint64) (bool, error)
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
// PushHeader indexes a block header.
PushHeader(batch Batch, header *types.Header, reward, td *big.Int) (string, error) PushHeader(batch Batch, header *types.Header, reward, td *big.Int) (string, error)
// PushStateNode indexes a state node and its storage trie.
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
// PushIPLD indexes an IPLD node.
PushIPLD(tx Batch, ipld sdtypes.IPLD) error PushIPLD(tx Batch, ipld sdtypes.IPLD) error
ReportDBMetrics(delay time.Duration, quit <-chan bool) // BeginTx starts a new DB transaction.
BeginTx(number *big.Int, ctx context.Context) Batch BeginTx(number *big.Int, ctx context.Context) Batch
// DetectGaps returns a list of gaps in the block range, if any.
DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error)
// CurrentBlock returns the latest indexed block.
CurrentBlock() (*models.HeaderModel, error)
// HasBlock returns true if the block is indexed.
HasBlock(hash common.Hash, number uint64) (bool, error)
// Close closes the associated output DB connection or files.
Close() error
// Methods used by WatchAddress API/functionality // Methods used by WatchAddress API/functionality
LoadWatchedAddresses() ([]common.Address, error) LoadWatchedAddresses() ([]common.Address, error)
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error
SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error
ClearWatchedAddresses() error ClearWatchedAddresses() error
Close() error ReportDBMetrics(delay time.Duration, quit <-chan bool)
} }
// Batch required for indexing data atomically // Batch required for indexing data atomically

View File

@ -26,7 +26,7 @@ func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) {
gethContext = ctx gethContext = ctx
} }
func InitializeNode(stack core.Node, b core.Backend) { func InitializeNode(_ core.Node, b core.Backend) {
backend := b.(restricted.Backend) backend := b.(restricted.Backend)
networkid, err := strconv.ParseUint(gethContext.String(geth_flags.NetworkIdFlag.Name), 10, 64) networkid, err := strconv.ParseUint(gethContext.String(geth_flags.NetworkIdFlag.Name), 10, 64)

View File

@ -18,14 +18,15 @@ echo CERC_REMOTE_DEBUG=false >> $CONFIG_DIR/stack.env
if [[ -z $SKIP_BUILD ]]; then if [[ -z $SKIP_BUILD ]]; then
# Assume the tested image has been built separately.
$laconic_so setup-repositories \ $laconic_so setup-repositories \
--exclude git.vdb.to/cerc-io/plugeth-statediff --exclude git.vdb.to/cerc-io/plugeth-statediff,git.vdb.to/cerc-io/ipld-eth-server
# Assume the tested image has been built separately
$laconic_so build-containers \ $laconic_so build-containers \
--exclude cerc/plugeth-statediff --exclude cerc/plugeth-statediff,cerc/ipld-eth-server
fi fi
if ! $laconic_so deploy \ if ! $laconic_so deploy \
--exclude ipld-eth-server \
--env-file $CONFIG_DIR/stack.env \ --env-file $CONFIG_DIR/stack.env \
--cluster test up --cluster test up
then then

View File

@ -1,20 +0,0 @@
version: "1.2"
name: fixturenet-plugeth-tx
description: "Plugeth Ethereum Fixturenet for testing plugeth-statediff"
repos:
- git.vdb.to/cerc-io/plugeth@v1.13.14-cerc-2
- git.vdb.to/cerc-io/plugeth-statediff
- git.vdb.to/cerc-io/lighthouse
- git.vdb.to/cerc-io/ipld-eth-db@v5.4.0-alpha
containers:
- cerc/plugeth-statediff
- cerc/plugeth
- cerc/fixturenet-eth-genesis
- cerc/fixturenet-plugeth-plugeth
- cerc/lighthouse
- cerc/lighthouse-cli
- cerc/fixturenet-eth-lighthouse
- cerc/ipld-eth-db
pods:
- fixturenet-plugeth
- ipld-eth-db

View File

@ -12,14 +12,15 @@ import (
"testing" "testing"
"github.com/cerc-io/eth-iterator-utils/tracker" "github.com/cerc-io/eth-iterator-utils/tracker"
statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
) )
var subtrieCounts = []uint{1, 8, 32} var subtrieCounts = []uint{1, 8, 32}

View File

@ -5,15 +5,16 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/cerc-io/plugeth-statediff" "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt" "github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer" "github.com/cerc-io/plugeth-statediff/indexer"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/node" "github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
) )
type IndexChainParams struct { type IndexChainParams struct {