WIP: Synchronize DelayedTx cache, clean up #31
@ -3,6 +3,7 @@ package sql
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
|
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
|
||||||
@ -15,6 +16,7 @@ const copyFromCheckLimit = 100
|
|||||||
type DelayedTx struct {
|
type DelayedTx struct {
|
||||||
cache []interface{}
|
cache []interface{}
|
||||||
db Database
|
db Database
|
||||||
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
type cachedStmt struct {
|
type cachedStmt struct {
|
||||||
sql string
|
sql string
|
||||||
@ -27,6 +29,8 @@ type copyFrom struct {
|
|||||||
rows [][]interface{}
|
rows [][]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type result int64
|
||||||
|
|
||||||
func (cf *copyFrom) appendRows(rows [][]interface{}) {
|
func (cf *copyFrom) appendRows(rows [][]interface{}) {
|
||||||
cf.rows = append(cf.rows, rows...)
|
cf.rows = append(cf.rows, rows...)
|
||||||
}
|
}
|
||||||
@ -44,6 +48,8 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
|
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
|
||||||
|
tx.RLock()
|
||||||
|
defer tx.RUnlock()
|
||||||
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
|
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
|
||||||
prevCopy, ok := tx.cache[pos].(*copyFrom)
|
prevCopy, ok := tx.cache[pos].(*copyFrom)
|
||||||
if ok && prevCopy.matches(tableName, columnNames) {
|
if ok && prevCopy.matches(tableName, columnNames) {
|
||||||
@ -59,6 +65,8 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam
|
|||||||
"current", len(prevCopy.rows), "new", len(rows), "distance", distance)
|
"current", len(prevCopy.rows), "new", len(rows), "distance", distance)
|
||||||
prevCopy.appendRows(rows)
|
prevCopy.appendRows(rows)
|
||||||
} else {
|
} else {
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
tx.cache = append(tx.cache, ©From{tableName, columnNames, rows})
|
tx.cache = append(tx.cache, ©From{tableName, columnNames, rows})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,8 +74,10 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
|
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
tx.cache = append(tx.cache, cachedStmt{sql, args})
|
tx.cache = append(tx.cache, cachedStmt{sql, args})
|
||||||
return nil, nil
|
return result(0), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *DelayedTx) Commit(ctx context.Context) error {
|
func (tx *DelayedTx) Commit(ctx context.Context) error {
|
||||||
@ -85,6 +95,8 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
|
|||||||
rollback(ctx, base)
|
rollback(ctx, base)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
for _, item := range tx.cache {
|
for _, item := range tx.cache {
|
||||||
switch item := item.(type) {
|
switch item := item.(type) {
|
||||||
case *copyFrom:
|
case *copyFrom:
|
||||||
@ -105,6 +117,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tx *DelayedTx) Rollback(ctx context.Context) error {
|
func (tx *DelayedTx) Rollback(ctx context.Context) error {
|
||||||
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
tx.cache = nil
|
tx.cache = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RowsAffected satisfies sql.Result
|
||||||
|
func (r result) RowsAffected() (int64, error) {
|
||||||
|
return int64(r), nil
|
||||||
|
}
|
||||||
|
@ -29,27 +29,39 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// StateDiffIndexer interface required to index statediff data
|
// StateDiffIndexer describes the interface for indexing state data.
|
||||||
type StateDiffIndexer interface {
|
type StateDiffIndexer interface {
|
||||||
DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error)
|
// PushBlock indexes block data except for state & storage nodes: header, uncles, transactions &
|
||||||
CurrentBlock() (*models.HeaderModel, error)
|
// receipts. Returns an initiated DB transaction which must be committed or rolled back.
|
||||||
HasBlock(hash common.Hash, number uint64) (bool, error)
|
|
||||||
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
|
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
|
||||||
|
// PushHeader indexes a block header.
|
||||||
PushHeader(batch Batch, header *types.Header, reward, td *big.Int) (string, error)
|
PushHeader(batch Batch, header *types.Header, reward, td *big.Int) (string, error)
|
||||||
|
// PushStateNode indexes a state node and its storage trie.
|
||||||
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
|
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
|
||||||
|
// PushIPLD indexes an IPLD node.
|
||||||
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
|
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
|
||||||
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
// BeginTx starts a new DB transaction.
|
||||||
|
|
||||||
BeginTx(number *big.Int, ctx context.Context) Batch
|
BeginTx(number *big.Int, ctx context.Context) Batch
|
||||||
|
|
||||||
|
// DetectGaps returns a list of gaps in the block range, if any.
|
||||||
|
DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error)
|
||||||
|
// CurrentBlock returns the latest indexed block.
|
||||||
|
CurrentBlock() (*models.HeaderModel, error)
|
||||||
|
// HasBlock returns true if the block is indexed.
|
||||||
|
HasBlock(hash common.Hash, number uint64) (bool, error)
|
||||||
|
|
||||||
|
// Close closes the associated output DB connection or files.
|
||||||
|
Close() error
|
||||||
|
|
||||||
// Methods used by WatchAddress API/functionality
|
// Methods used by WatchAddress API/functionality
|
||||||
|
|
||||||
LoadWatchedAddresses() ([]common.Address, error)
|
LoadWatchedAddresses() ([]common.Address, error)
|
||||||
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
|
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
|
||||||
RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error
|
RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error
|
||||||
SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error
|
SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error
|
||||||
ClearWatchedAddresses() error
|
ClearWatchedAddresses() error
|
||||||
|
|
||||||
Close() error
|
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Batch required for indexing data atomically
|
// Batch required for indexing data atomically
|
||||||
|
@ -26,7 +26,7 @@ func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) {
|
|||||||
gethContext = ctx
|
gethContext = ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func InitializeNode(stack core.Node, b core.Backend) {
|
func InitializeNode(_ core.Node, b core.Backend) {
|
||||||
backend := b.(restricted.Backend)
|
backend := b.(restricted.Backend)
|
||||||
|
|
||||||
networkid, err := strconv.ParseUint(gethContext.String(geth_flags.NetworkIdFlag.Name), 10, 64)
|
networkid, err := strconv.ParseUint(gethContext.String(geth_flags.NetworkIdFlag.Name), 10, 64)
|
||||||
|
@ -18,14 +18,15 @@ echo CERC_REMOTE_DEBUG=false >> $CONFIG_DIR/stack.env
|
|||||||
|
|
||||||
|
|
||||||
if [[ -z $SKIP_BUILD ]]; then
|
if [[ -z $SKIP_BUILD ]]; then
|
||||||
|
# Assume the tested image has been built separately.
|
||||||
$laconic_so setup-repositories \
|
$laconic_so setup-repositories \
|
||||||
--exclude git.vdb.to/cerc-io/plugeth-statediff
|
--exclude git.vdb.to/cerc-io/plugeth-statediff,git.vdb.to/cerc-io/ipld-eth-server
|
||||||
# Assume the tested image has been built separately
|
|
||||||
$laconic_so build-containers \
|
$laconic_so build-containers \
|
||||||
--exclude cerc/plugeth-statediff
|
--exclude cerc/plugeth-statediff,cerc/ipld-eth-server
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if ! $laconic_so deploy \
|
if ! $laconic_so deploy \
|
||||||
|
--exclude ipld-eth-server \
|
||||||
--env-file $CONFIG_DIR/stack.env \
|
--env-file $CONFIG_DIR/stack.env \
|
||||||
--cluster test up
|
--cluster test up
|
||||||
then
|
then
|
||||||
|
@ -1,20 +0,0 @@
|
|||||||
version: "1.2"
|
|
||||||
name: fixturenet-plugeth-tx
|
|
||||||
description: "Plugeth Ethereum Fixturenet for testing plugeth-statediff"
|
|
||||||
repos:
|
|
||||||
- git.vdb.to/cerc-io/plugeth@v1.13.14-cerc-2
|
|
||||||
- git.vdb.to/cerc-io/plugeth-statediff
|
|
||||||
- git.vdb.to/cerc-io/lighthouse
|
|
||||||
- git.vdb.to/cerc-io/ipld-eth-db@v5.4.0-alpha
|
|
||||||
containers:
|
|
||||||
- cerc/plugeth-statediff
|
|
||||||
- cerc/plugeth
|
|
||||||
- cerc/fixturenet-eth-genesis
|
|
||||||
- cerc/fixturenet-plugeth-plugeth
|
|
||||||
- cerc/lighthouse
|
|
||||||
- cerc/lighthouse-cli
|
|
||||||
- cerc/fixturenet-eth-lighthouse
|
|
||||||
- cerc/ipld-eth-db
|
|
||||||
pods:
|
|
||||||
- fixturenet-plugeth
|
|
||||||
- ipld-eth-db
|
|
@ -12,14 +12,15 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/cerc-io/eth-iterator-utils/tracker"
|
"github.com/cerc-io/eth-iterator-utils/tracker"
|
||||||
statediff "github.com/cerc-io/plugeth-statediff"
|
|
||||||
"github.com/cerc-io/plugeth-statediff/adapt"
|
|
||||||
sdtypes "github.com/cerc-io/plugeth-statediff/types"
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/cerc-io/plugeth-statediff"
|
||||||
|
"github.com/cerc-io/plugeth-statediff/adapt"
|
||||||
|
sdtypes "github.com/cerc-io/plugeth-statediff/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
var subtrieCounts = []uint{1, 8, 32}
|
var subtrieCounts = []uint{1, 8, 32}
|
||||||
|
@ -5,15 +5,16 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/params"
|
||||||
|
|
||||||
"github.com/cerc-io/plugeth-statediff"
|
"github.com/cerc-io/plugeth-statediff"
|
||||||
"github.com/cerc-io/plugeth-statediff/adapt"
|
"github.com/cerc-io/plugeth-statediff/adapt"
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer"
|
"github.com/cerc-io/plugeth-statediff/indexer"
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
|
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer/node"
|
"github.com/cerc-io/plugeth-statediff/indexer/node"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
"github.com/ethereum/go-ethereum/params"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type IndexChainParams struct {
|
type IndexChainParams struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user