cherry pick Thomas' work #349
@ -268,6 +268,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
if ctx.IsSet(utils.StateDiffLogStatements.Name) {
|
||||
pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name)
|
||||
}
|
||||
if ctx.IsSet(utils.StateDiffCopyFrom.Name) {
|
||||
pgConfig.CopyFrom = ctx.Bool(utils.StateDiffCopyFrom.Name)
|
||||
}
|
||||
indexerConfig = pgConfig
|
||||
case shared.DUMP:
|
||||
dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name)
|
||||
|
||||
@ -180,6 +180,7 @@ var (
|
||||
utils.StateDiffWatchedAddressesFilePath,
|
||||
utils.StateDiffUpsert,
|
||||
utils.StateDiffLogStatements,
|
||||
utils.StateDiffCopyFrom,
|
||||
configFileFlag,
|
||||
}, utils.NetworkFlags, utils.DatabasePathFlags)
|
||||
|
||||
|
||||
@ -1083,6 +1083,13 @@ var (
|
||||
Usage: "Should the statediff service log all database statements? (Note: pgx only)",
|
||||
Value: false,
|
||||
}
|
||||
|
||||
StateDiffCopyFrom = &cli.BoolFlag{
|
||||
Name: "statediff.db.copyfrom",
|
||||
Usage: "Should the statediff service use COPY FROM for multiple inserts? (Note: pgx only)",
|
||||
Value: false,
|
||||
}
|
||||
|
||||
StateDiffWritingFlag = &cli.BoolFlag{
|
||||
Name: "statediff.writing",
|
||||
Usage: "Activates progressive writing of state diffs to database as new block are synced",
|
||||
|
||||
@ -31,6 +31,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/consensus/misc"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
@ -330,7 +331,13 @@ func (ethash *Ethash) verifyHeader(chain consensus.ChainHeaderReader, header, pa
|
||||
// the difficulty that a new block should have when created at time
|
||||
// given the parent block's time and difficulty.
|
||||
func (ethash *Ethash) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int {
|
||||
return CalcDifficulty(chain.Config(), time, parent)
|
||||
var config = chain.Config()
|
||||
var ret = CalcDifficulty(config, time, parent)
|
||||
if nil != config.CappedMaximumDifficulty && ret.Cmp(config.CappedMaximumDifficulty) >= 0 {
|
||||
log.Info(fmt.Sprintf("Using capped difficulty %d", config.CappedMaximumDifficulty))
|
||||
return config.CappedMaximumDifficulty
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// CalcDifficulty is the difficulty adjustment algorithm. It returns
|
||||
|
||||
@ -63,6 +63,13 @@ type Database struct {
|
||||
fn string // filename for reporting
|
||||
db *leveldb.DB // LevelDB instance
|
||||
|
||||
getTimer metrics.Timer // Timer/counter for measuring time and invocations of Get().
|
||||
putTimer metrics.Timer // Timer/counter for measuring time and invocations of Put().
|
||||
deleteTimer metrics.Timer // Timer/counter for measuring time and invocations of Delete().
|
||||
hasTimer metrics.Timer // Timer/counter for measuring time and invocations of Has().
|
||||
batchWriteTimer metrics.Timer // Timer/counter for measuring time and invocations of batch writes.
|
||||
batchItemCounter metrics.Counter // Counter for measuring number of batched items written.
|
||||
|
||||
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
|
||||
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
|
||||
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
|
||||
@ -144,6 +151,13 @@ func NewCustom(file string, namespace string, customize func(options *opt.Option
|
||||
ldb.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil)
|
||||
ldb.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil)
|
||||
|
||||
ldb.getTimer = metrics.NewRegisteredTimer(namespace+"db/get/time", nil)
|
||||
ldb.putTimer = metrics.NewRegisteredTimer(namespace+"db/put/time", nil)
|
||||
ldb.deleteTimer = metrics.NewRegisteredTimer(namespace+"db/delete/time", nil)
|
||||
ldb.hasTimer = metrics.NewRegisteredTimer(namespace+"db/has/time", nil)
|
||||
ldb.batchWriteTimer = metrics.NewRegisteredTimer(namespace+"db/batch_write/time", nil)
|
||||
ldb.batchItemCounter = metrics.NewRegisteredCounter(namespace+"db/batch_item/count", nil)
|
||||
|
||||
// Start up the metrics gathering and return
|
||||
go ldb.meter(metricsGatheringInterval)
|
||||
return ldb, nil
|
||||
@ -182,11 +196,17 @@ func (db *Database) Close() error {
|
||||
|
||||
// Has retrieves if a key is present in the key-value store.
|
||||
func (db *Database) Has(key []byte) (bool, error) {
|
||||
if nil != db.hasTimer {
|
||||
defer func(start time.Time) { db.hasTimer.UpdateSince(start) }(time.Now())
|
||||
}
|
||||
return db.db.Has(key, nil)
|
||||
}
|
||||
|
||||
// Get retrieves the given key if it's present in the key-value store.
|
||||
func (db *Database) Get(key []byte) ([]byte, error) {
|
||||
if nil != db.getTimer {
|
||||
defer func(start time.Time) { db.getTimer.UpdateSince(start) }(time.Now())
|
||||
}
|
||||
dat, err := db.db.Get(key, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -196,11 +216,17 @@ func (db *Database) Get(key []byte) ([]byte, error) {
|
||||
|
||||
// Put inserts the given value into the key-value store.
|
||||
func (db *Database) Put(key []byte, value []byte) error {
|
||||
if nil != db.putTimer {
|
||||
defer func(start time.Time) { db.putTimer.UpdateSince(start) }(time.Now())
|
||||
}
|
||||
return db.db.Put(key, value, nil)
|
||||
}
|
||||
|
||||
// Delete removes the key from the key-value store.
|
||||
func (db *Database) Delete(key []byte) error {
|
||||
if nil != db.deleteTimer {
|
||||
defer func(start time.Time) { db.deleteTimer.UpdateSince(start) }(time.Now())
|
||||
}
|
||||
return db.db.Delete(key, nil)
|
||||
}
|
||||
|
||||
@ -208,16 +234,20 @@ func (db *Database) Delete(key []byte) error {
|
||||
// database until a final write is called.
|
||||
func (db *Database) NewBatch() ethdb.Batch {
|
||||
return &batch{
|
||||
db: db.db,
|
||||
b: new(leveldb.Batch),
|
||||
db: db.db,
|
||||
b: new(leveldb.Batch),
|
||||
writeTimer: &db.batchWriteTimer,
|
||||
itemCounter: &db.batchItemCounter,
|
||||
}
|
||||
}
|
||||
|
||||
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
|
||||
func (db *Database) NewBatchWithSize(size int) ethdb.Batch {
|
||||
return &batch{
|
||||
db: db.db,
|
||||
b: leveldb.MakeBatch(size),
|
||||
db: db.db,
|
||||
b: leveldb.MakeBatch(size),
|
||||
writeTimer: &db.batchWriteTimer,
|
||||
itemCounter: &db.batchItemCounter,
|
||||
}
|
||||
}
|
||||
|
||||
@ -468,9 +498,11 @@ func (db *Database) meter(refresh time.Duration) {
|
||||
// batch is a write-only leveldb batch that commits changes to its host database
|
||||
// when Write is called. A batch cannot be used concurrently.
|
||||
type batch struct {
|
||||
db *leveldb.DB
|
||||
b *leveldb.Batch
|
||||
size int
|
||||
db *leveldb.DB
|
||||
b *leveldb.Batch
|
||||
size int
|
||||
writeTimer *metrics.Timer
|
||||
itemCounter *metrics.Counter
|
||||
}
|
||||
|
||||
// Put inserts the given value into the batch for later committing.
|
||||
@ -494,6 +526,12 @@ func (b *batch) ValueSize() int {
|
||||
|
||||
// Write flushes any accumulated data to disk.
|
||||
func (b *batch) Write() error {
|
||||
if nil != *b.writeTimer {
|
||||
defer func(start time.Time) { (*b.writeTimer).UpdateSince(start) }(time.Now())
|
||||
}
|
||||
if nil != *b.itemCounter {
|
||||
(*b.itemCounter).Inc(int64(b.size))
|
||||
}
|
||||
return b.db.Write(b.b, nil)
|
||||
}
|
||||
|
||||
|
||||
4
go.mod
4
go.mod
@ -63,7 +63,7 @@ require (
|
||||
github.com/rs/cors v1.7.0
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible
|
||||
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4
|
||||
github.com/stretchr/testify v1.8.0
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/supranational/blst v0.3.8
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
|
||||
github.com/thoas/go-funk v0.9.2
|
||||
@ -120,7 +120,7 @@ require (
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/stretchr/objx v0.4.0 // indirect
|
||||
github.com/stretchr/objx v0.3.0 // indirect
|
||||
github.com/tklauser/numcpus v0.2.2 // indirect
|
||||
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||
|
||||
8
go.sum
8
go.sum
@ -537,17 +537,15 @@ github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZL
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
|
||||
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As=
|
||||
github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
|
||||
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/supranational/blst v0.3.8 h1:glwLF4oBRSJOTr05lRBgNwGQST0ndP2wg29fSeTRKCY=
|
||||
github.com/supranational/blst v0.3.8/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
|
||||
|
||||
@ -82,6 +82,9 @@ func (c *collector) addTimer(name string, m metrics.Timer) {
|
||||
c.writeSummaryPercentile(name, strconv.FormatFloat(pv[i], 'f', -1, 64), ps[i])
|
||||
}
|
||||
c.buff.WriteRune('\n')
|
||||
|
||||
c.buff.WriteString(fmt.Sprintf(typeGaugeTpl, mutateKey(name+"_total")))
|
||||
c.buff.WriteString(fmt.Sprintf(keyValueTpl, mutateKey(name+"_total"), m.Total()))
|
||||
}
|
||||
|
||||
func (c *collector) addResettingTimer(name string, m metrics.ResettingTimer) {
|
||||
|
||||
@ -92,6 +92,9 @@ test_timer {quantile="0.99"} 1.2e+08
|
||||
test_timer {quantile="0.999"} 1.2e+08
|
||||
test_timer {quantile="0.9999"} 1.2e+08
|
||||
|
||||
# TYPE test_timer_total gauge
|
||||
test_timer_total 230000000
|
||||
|
||||
# TYPE test_resetting_timer_count counter
|
||||
test_resetting_timer_count 6
|
||||
|
||||
|
||||
@ -25,6 +25,7 @@ type Timer interface {
|
||||
Update(time.Duration)
|
||||
UpdateSince(time.Time)
|
||||
Variance() float64
|
||||
Total() int64
|
||||
}
|
||||
|
||||
// GetOrRegisterTimer returns an existing Timer or constructs and registers a
|
||||
@ -47,6 +48,7 @@ func NewCustomTimer(h Histogram, m Meter) Timer {
|
||||
return &StandardTimer{
|
||||
histogram: h,
|
||||
meter: m,
|
||||
total: NewCounter(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -72,6 +74,7 @@ func NewTimer() Timer {
|
||||
return &StandardTimer{
|
||||
histogram: NewHistogram(NewExpDecaySample(1028, 0.015)),
|
||||
meter: NewMeter(),
|
||||
total: NewCounter(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -134,11 +137,15 @@ func (NilTimer) UpdateSince(time.Time) {}
|
||||
// Variance is a no-op.
|
||||
func (NilTimer) Variance() float64 { return 0.0 }
|
||||
|
||||
// Total is a no-op.
|
||||
func (NilTimer) Total() int64 { return int64(0) }
|
||||
|
||||
// StandardTimer is the standard implementation of a Timer and uses a Histogram
|
||||
// and Meter.
|
||||
type StandardTimer struct {
|
||||
histogram Histogram
|
||||
meter Meter
|
||||
total Counter
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
@ -200,6 +207,7 @@ func (t *StandardTimer) Snapshot() Timer {
|
||||
return &TimerSnapshot{
|
||||
histogram: t.histogram.Snapshot().(*HistogramSnapshot),
|
||||
meter: t.meter.Snapshot().(*MeterSnapshot),
|
||||
total: t.total.Snapshot().(CounterSnapshot),
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,14 +239,12 @@ func (t *StandardTimer) Update(d time.Duration) {
|
||||
defer t.mutex.Unlock()
|
||||
t.histogram.Update(int64(d))
|
||||
t.meter.Mark(1)
|
||||
t.total.Inc(int64(d))
|
||||
}
|
||||
|
||||
// Record the duration of an event that started at a time and ends now.
|
||||
func (t *StandardTimer) UpdateSince(ts time.Time) {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
t.histogram.Update(int64(time.Since(ts)))
|
||||
t.meter.Mark(1)
|
||||
t.Update(time.Since(ts))
|
||||
}
|
||||
|
||||
// Variance returns the variance of the values in the sample.
|
||||
@ -246,10 +252,18 @@ func (t *StandardTimer) Variance() float64 {
|
||||
return t.histogram.Variance()
|
||||
}
|
||||
|
||||
// Total returns the total time the timer has run in nanoseconds.
|
||||
// This differs from Sum in that it is a simple counter, not based
|
||||
// on a histogram Sample.
|
||||
func (t *StandardTimer) Total() int64 {
|
||||
return t.total.Count()
|
||||
}
|
||||
|
||||
// TimerSnapshot is a read-only copy of another Timer.
|
||||
type TimerSnapshot struct {
|
||||
histogram *HistogramSnapshot
|
||||
meter *MeterSnapshot
|
||||
total CounterSnapshot
|
||||
}
|
||||
|
||||
// Count returns the number of events recorded at the time the snapshot was
|
||||
@ -324,3 +338,6 @@ func (*TimerSnapshot) UpdateSince(time.Time) {
|
||||
// Variance returns the variance of the values at the time the snapshot was
|
||||
// taken.
|
||||
func (t *TimerSnapshot) Variance() float64 { return t.histogram.Variance() }
|
||||
|
||||
// Total returns the total time the timer has run in nanoseconds.
|
||||
func (t *TimerSnapshot) Total() int64 { return t.total.Count() }
|
||||
|
||||
@ -270,16 +270,16 @@ var (
|
||||
//
|
||||
// This configuration is intentionally not using keyed fields to force anyone
|
||||
// adding flags to the config to also have to set these fields.
|
||||
AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil}
|
||||
AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil}
|
||||
|
||||
// AllCliqueProtocolChanges contains every protocol change (EIPs) introduced
|
||||
// and accepted by the Ethereum core developers into the Clique consensus.
|
||||
//
|
||||
// This configuration is intentionally not using keyed fields to force anyone
|
||||
// adding flags to the config to also have to set these fields.
|
||||
AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}}
|
||||
AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}}
|
||||
|
||||
TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil}
|
||||
TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil}
|
||||
TestRules = TestChainConfig.Rules(new(big.Int), false)
|
||||
)
|
||||
|
||||
@ -377,6 +377,9 @@ type ChainConfig struct {
|
||||
// the network that triggers the consensus upgrade.
|
||||
TerminalTotalDifficulty *big.Int `json:"terminalTotalDifficulty,omitempty"`
|
||||
|
||||
// Cap the maximum total difficulty (for testnet use only).
|
||||
CappedMaximumDifficulty *big.Int `json:"cappedMaximumDifficulty,omitempty"`
|
||||
|
||||
// TerminalTotalDifficultyPassed is a flag specifying that the network already
|
||||
// passed the terminal total difficulty. Its purpose is to disable legacy sync
|
||||
// even without having seen the TTD locally (safer long term).
|
||||
@ -419,7 +422,11 @@ func (c *ChainConfig) String() string {
|
||||
switch {
|
||||
case c.Ethash != nil:
|
||||
if c.TerminalTotalDifficulty == nil {
|
||||
banner += "Consensus: Ethash (proof-of-work)\n"
|
||||
if nil == c.CappedMaximumDifficulty {
|
||||
banner += "Consensus: Ethash (proof-of-work)\n"
|
||||
} else {
|
||||
banner += fmt.Sprintf("Consensus: Ethash (proof-of-work, capped difficulty at %d)\n", c.CappedMaximumDifficulty)
|
||||
}
|
||||
} else if !c.TerminalTotalDifficultyPassed {
|
||||
banner += "Consensus: Beacon (proof-of-stake), merging from Ethash (proof-of-work)\n"
|
||||
} else {
|
||||
@ -434,7 +441,11 @@ func (c *ChainConfig) String() string {
|
||||
banner += "Consensus: Beacon (proof-of-stake), merged from Clique (proof-of-authority)\n"
|
||||
}
|
||||
default:
|
||||
banner += "Consensus: unknown\n"
|
||||
if nil == c.CappedMaximumDifficulty {
|
||||
banner += "Consensus: unknown\n"
|
||||
} else {
|
||||
banner += fmt.Sprintf("Consensus: unknown (capped difficulty at %d)\n", c.CappedMaximumDifficulty)
|
||||
}
|
||||
}
|
||||
banner += "\n"
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ package statediff
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
@ -29,6 +30,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
"github.com/ethereum/go-ethereum/statediff/trie_helpers"
|
||||
@ -45,7 +47,7 @@ var (
|
||||
// Builder interface exposes the method for building a state diff between two blocks
|
||||
type Builder interface {
|
||||
BuildStateDiffObject(args Args, params Params) (types2.StateObject, error)
|
||||
WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error
|
||||
WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error
|
||||
}
|
||||
|
||||
type StateDiffBuilder struct {
|
||||
@ -84,11 +86,10 @@ func NewBuilder(stateCache state.Database) Builder {
|
||||
|
||||
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
|
||||
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (types2.StateObject, error) {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStateDiffObjectTimer)
|
||||
var stateNodes []types2.StateLeafNode
|
||||
var iplds []types2.IPLD
|
||||
err := sdb.WriteStateDiffObject(
|
||||
types2.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot},
|
||||
params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds))
|
||||
err := sdb.WriteStateDiffObject(args, params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds))
|
||||
if err != nil {
|
||||
return types2.StateObject{}, err
|
||||
}
|
||||
@ -101,8 +102,9 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (typ
|
||||
}
|
||||
|
||||
// WriteStateDiffObject writes a statediff object to output sinks
|
||||
func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink,
|
||||
func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink,
|
||||
ipldOutput types2.IPLDSink) error {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.WriteStateDiffObjectTimer)
|
||||
// Load tries for old and new states
|
||||
oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot)
|
||||
if err != nil {
|
||||
@ -128,16 +130,19 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params
|
||||
},
|
||||
}
|
||||
|
||||
return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput)
|
||||
logger := log.New("hash", args.BlockHash.Hex(), "number", args.BlockNumber)
|
||||
return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput, logger)
|
||||
}
|
||||
|
||||
func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params,
|
||||
output types2.StateNodeSink, ipldOutput types2.IPLDSink) error {
|
||||
output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error {
|
||||
logger.Debug("statediff BEGIN BuildStateDiffWithIntermediateStateNodes")
|
||||
defer metrics2.ReportAndUpdateDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics2.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer)
|
||||
// collect a slice of all the nodes that were touched and exist at B (B-A)
|
||||
// a map of their leafkey to all the accounts that were touched and exist at B
|
||||
// and a slice of all the paths for the nodes in both of the above sets
|
||||
diffAccountsAtB, err := sdb.createdAndUpdatedState(
|
||||
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput)
|
||||
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
|
||||
}
|
||||
@ -146,28 +151,34 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
|
||||
// a map of their leafkey to all the accounts that were touched and exist at A
|
||||
diffAccountsAtA, err := sdb.deletedOrUpdatedState(
|
||||
iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB,
|
||||
params.watchedAddressesLeafPaths, output)
|
||||
params.watchedAddressesLeafPaths, output, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
|
||||
}
|
||||
|
||||
// collect and sort the leafkey keys for both account mappings into a slice
|
||||
t := time.Now()
|
||||
createKeys := trie_helpers.SortKeys(diffAccountsAtB)
|
||||
deleteKeys := trie_helpers.SortKeys(diffAccountsAtA)
|
||||
logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes sort duration=%dms", time.Since(t).Milliseconds()))
|
||||
|
||||
// and then find the intersection of these keys
|
||||
// these are the leafkeys for the accounts which exist at both A and B but are different
|
||||
// this also mutates the passed in createKeys and deleteKeys, removing the intersection keys
|
||||
// and leaving the truly created or deleted keys in place
|
||||
t = time.Now()
|
||||
updatedKeys := trie_helpers.FindIntersection(createKeys, deleteKeys)
|
||||
logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes intersection count=%d duration=%dms",
|
||||
len(updatedKeys),
|
||||
time.Since(t).Milliseconds()))
|
||||
|
||||
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
|
||||
err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput)
|
||||
err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error building diff for updated accounts: %v", err)
|
||||
}
|
||||
// build the diff nodes for created accounts
|
||||
err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput)
|
||||
err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error building diff for created accounts: %v", err)
|
||||
}
|
||||
@ -179,11 +190,13 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
|
||||
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A
|
||||
// and a slice of the paths for all of the nodes included in both
|
||||
func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
|
||||
watchedAddressesLeafPaths [][]byte, output types2.IPLDSink) (types2.AccountMap, error) {
|
||||
watchedAddressesLeafPaths [][]byte, output types2.IPLDSink, logger log.Logger) (types2.AccountMap, error) {
|
||||
logger.Debug("statediff BEGIN createdAndUpdatedState")
|
||||
defer metrics2.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.CreatedAndUpdatedStateTimer)
|
||||
diffAccountsAtB := make(types2.AccountMap)
|
||||
watchingAddresses := len(watchedAddressesLeafPaths) > 0
|
||||
|
||||
it, _ := trie.NewDifferenceIterator(a, b)
|
||||
it, itCount := trie.NewDifferenceIterator(a, b)
|
||||
for it.Next(true) {
|
||||
// ignore node if it is not along paths of interest
|
||||
if watchingAddresses && !isValidPrefixPath(watchedAddressesLeafPaths, it.Path()) {
|
||||
@ -236,6 +249,8 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.Debug("statediff COUNTS createdAndUpdatedStateWithIntermediateNodes", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB))
|
||||
metrics2.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
|
||||
return diffAccountsAtB, it.Error()
|
||||
}
|
||||
|
||||
@ -275,7 +290,9 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watched
|
||||
// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B
|
||||
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
|
||||
func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap,
|
||||
watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink) (types2.AccountMap, error) {
|
||||
watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink, logger log.Logger) (types2.AccountMap, error) {
|
||||
logger.Debug("statediff BEGIN deletedOrUpdatedState")
|
||||
defer metrics2.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.DeletedOrUpdatedStateTimer)
|
||||
diffAccountAtA := make(types2.AccountMap)
|
||||
watchingAddresses := len(watchedAddressesLeafPaths) > 0
|
||||
|
||||
@ -330,7 +347,9 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA
|
||||
// needs to be called before building account creations and deletions as this mutates
|
||||
// those account maps to remove the accounts which were updated
|
||||
func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.AccountMap, updatedKeys []string,
|
||||
output types2.StateNodeSink, ipldOutput types2.IPLDSink) error {
|
||||
output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error {
|
||||
logger.Debug("statediff BEGIN buildAccountUpdates", "creations", len(creations), "deletions", len(deletions), "updatedKeys", len(updatedKeys))
|
||||
defer metrics2.ReportAndUpdateDuration("statediff END buildAccountUpdates ", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountUpdatesTimer)
|
||||
var err error
|
||||
for _, key := range updatedKeys {
|
||||
createdAcc := creations[key]
|
||||
@ -361,7 +380,10 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.Acc
|
||||
|
||||
// buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A
|
||||
// it also returns the code and codehash for created contract accounts
|
||||
func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error {
|
||||
func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink,
|
||||
ipldOutput types2.IPLDSink, logger log.Logger) error {
|
||||
logger.Debug("statediff BEGIN buildAccountCreations")
|
||||
defer metrics2.ReportAndUpdateDuration("statediff END buildAccountCreations", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountCreationsTimer)
|
||||
for _, val := range accounts {
|
||||
diff := types2.StateLeafNode{
|
||||
AccountWrapper: val,
|
||||
@ -400,6 +422,7 @@ func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, o
|
||||
// i.e. it returns all the storage nodes at this state, since there is no previous state
|
||||
func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output types2.StorageNodeSink,
|
||||
ipldOutput types2.IPLDSink) error {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesEventualTimer)
|
||||
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
|
||||
return nil
|
||||
}
|
||||
@ -421,6 +444,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output ty
|
||||
// including intermediate nodes can be turned on or off
|
||||
func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink,
|
||||
ipldOutput types2.IPLDSink) error {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesFromTrieTimer)
|
||||
for it.Next(true) {
|
||||
if it.Leaf() {
|
||||
storageLeafNode, err := sdb.processStorageValueNode(it)
|
||||
@ -470,6 +494,7 @@ func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator) (type
|
||||
|
||||
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account
|
||||
func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output types2.StorageNodeSink) error {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedAccountStorageNodesTimer)
|
||||
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
|
||||
return nil
|
||||
}
|
||||
@ -489,6 +514,7 @@ func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, out
|
||||
|
||||
// buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator
|
||||
func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink) error {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedStorageNodesFromTrieTimer)
|
||||
for it.Next(true) {
|
||||
if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes
|
||||
leafKey := make([]byte, len(it.LeafKey()))
|
||||
@ -509,6 +535,7 @@ func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterat
|
||||
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
|
||||
func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, output types2.StorageNodeSink,
|
||||
ipldOutput types2.IPLDSink) error {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesIncrementalTimer)
|
||||
if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) {
|
||||
return nil
|
||||
}
|
||||
@ -537,6 +564,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, new
|
||||
|
||||
func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output types2.StorageNodeSink,
|
||||
ipldOutput types2.IPLDSink) (map[string]bool, error) {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.CreatedAndUpdatedStorageTimer)
|
||||
diffSlotsAtB := make(map[string]bool)
|
||||
it, _ := trie.NewDifferenceIterator(a, b)
|
||||
for it.Next(true) {
|
||||
@ -566,6 +594,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, ou
|
||||
}
|
||||
|
||||
func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, output types2.StorageNodeSink) error {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.DeletedOrUpdatedStorageTimer)
|
||||
it, _ := trie.NewDifferenceIterator(b, a)
|
||||
for it.Next(true) {
|
||||
if it.Leaf() {
|
||||
@ -602,6 +631,7 @@ func isValidPrefixPath(watchedAddressesLeafPaths [][]byte, currentPath []byte) b
|
||||
|
||||
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
|
||||
func isWatchedAddress(watchedAddressesLeafPaths [][]byte, valueNodePath []byte) bool {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.IsWatchedAddressTimer)
|
||||
for _, watchedAddressPath := range watchedAddressesLeafPaths {
|
||||
if bytes.Equal(watchedAddressPath, valueNodePath) {
|
||||
return true
|
||||
|
||||
@ -29,9 +29,9 @@ import (
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
@ -41,10 +41,6 @@ import (
|
||||
|
||||
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
||||
|
||||
var (
|
||||
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
||||
)
|
||||
|
||||
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
|
||||
type StateDiffIndexer struct {
|
||||
dump io.WriteCloser
|
||||
@ -106,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
close(self.quit)
|
||||
close(self.iplds)
|
||||
tDiff := time.Since(t)
|
||||
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
||||
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
if err := self.flush(); err != nil {
|
||||
@ -115,7 +111,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tPostgresCommit.Update(tDiff)
|
||||
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
log.Debug(traceMsg)
|
||||
@ -125,7 +121,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
go blockTx.cache()
|
||||
|
||||
tDiff := time.Since(t)
|
||||
indexerMetrics.tFreePostgres.Update(tDiff)
|
||||
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
|
||||
|
||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
@ -137,7 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tHeaderProcessing.Update(tDiff)
|
||||
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
// Publish and index uncles
|
||||
@ -146,7 +142,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tUncleProcessing.Update(tDiff)
|
||||
metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
// Publish and index receipts and txs
|
||||
@ -163,7 +159,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
|
||||
metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
|
||||
@ -1,94 +0,0 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package dump
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
namespace = "statediff"
|
||||
)
|
||||
|
||||
// Build a fully qualified metric name
|
||||
func metricName(subsystem, name string) string {
|
||||
if name == "" {
|
||||
return ""
|
||||
}
|
||||
parts := []string{namespace, name}
|
||||
if subsystem != "" {
|
||||
parts = []string{namespace, subsystem, name}
|
||||
}
|
||||
// Prometheus uses _ but geth metrics uses / and replaces
|
||||
return strings.Join(parts, "/")
|
||||
}
|
||||
|
||||
type indexerMetricsHandles struct {
|
||||
// The total number of processed blocks
|
||||
blocks metrics.Counter
|
||||
// The total number of processed transactions
|
||||
transactions metrics.Counter
|
||||
// The total number of processed receipts
|
||||
receipts metrics.Counter
|
||||
// The total number of processed logs
|
||||
logs metrics.Counter
|
||||
// The total number of access list entries processed
|
||||
accessListEntries metrics.Counter
|
||||
// Time spent waiting for free postgres tx
|
||||
tFreePostgres metrics.Timer
|
||||
// Postgres transaction commit duration
|
||||
tPostgresCommit metrics.Timer
|
||||
// Header processing time
|
||||
tHeaderProcessing metrics.Timer
|
||||
// Uncle processing time
|
||||
tUncleProcessing metrics.Timer
|
||||
// Tx and receipt processing time
|
||||
tTxAndRecProcessing metrics.Timer
|
||||
// State, storage, and code combined processing time
|
||||
tStateStoreCodeProcessing metrics.Timer
|
||||
}
|
||||
|
||||
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
||||
ctx := indexerMetricsHandles{
|
||||
blocks: metrics.NewCounter(),
|
||||
transactions: metrics.NewCounter(),
|
||||
receipts: metrics.NewCounter(),
|
||||
logs: metrics.NewCounter(),
|
||||
accessListEntries: metrics.NewCounter(),
|
||||
tFreePostgres: metrics.NewTimer(),
|
||||
tPostgresCommit: metrics.NewTimer(),
|
||||
tHeaderProcessing: metrics.NewTimer(),
|
||||
tUncleProcessing: metrics.NewTimer(),
|
||||
tTxAndRecProcessing: metrics.NewTimer(),
|
||||
tStateStoreCodeProcessing: metrics.NewTimer(),
|
||||
}
|
||||
subsys := "indexer"
|
||||
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
|
||||
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
|
||||
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
|
||||
reg.Register(metricName(subsys, "logs"), ctx.logs)
|
||||
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
|
||||
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
|
||||
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
|
||||
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
|
||||
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
|
||||
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
|
||||
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
|
||||
return ctx
|
||||
}
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/thoas/go-funk"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
@ -235,7 +236,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
|
||||
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
|
||||
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase)
|
||||
csw.rows <- tableRow{schema.TableHeader, values}
|
||||
indexerMetrics.blocks.Inc(1)
|
||||
metrics.IndexerMetrics.BlocksCounter.Inc(1)
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
|
||||
@ -250,7 +251,7 @@ func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
|
||||
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
|
||||
transaction.Src, transaction.Index, transaction.Type, transaction.Value)
|
||||
csw.rows <- tableRow{schema.TableTransaction, values}
|
||||
indexerMetrics.transactions.Inc(1)
|
||||
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
|
||||
@ -258,7 +259,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
|
||||
values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
|
||||
rct.PostState, rct.PostStatus)
|
||||
csw.rows <- tableRow{schema.TableReceipt, values}
|
||||
indexerMetrics.receipts.Inc(1)
|
||||
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
|
||||
@ -267,7 +268,7 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
|
||||
values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0,
|
||||
l.Topic1, l.Topic2, l.Topic3)
|
||||
csw.rows <- tableRow{schema.TableLog, values}
|
||||
indexerMetrics.logs.Inc(1)
|
||||
metrics.IndexerMetrics.LogsCounter.Inc(1)
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,13 +285,8 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
|
||||
}
|
||||
|
||||
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||
var storageKey string
|
||||
if storageCID.StorageKey != nullHash.String() {
|
||||
storageKey = storageCID.StorageKey
|
||||
}
|
||||
|
||||
var values []interface{}
|
||||
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageKey, storageCID.CID,
|
||||
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
||||
true, storageCID.Value, storageCID.Removed)
|
||||
csw.rows <- tableRow{schema.TableStorageNode, values}
|
||||
}
|
||||
|
||||
@ -34,9 +34,9 @@ import (
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
@ -53,10 +53,6 @@ const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address,
|
||||
|
||||
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
||||
|
||||
var (
|
||||
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
||||
)
|
||||
|
||||
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
|
||||
type StateDiffIndexer struct {
|
||||
fileWriter FileWriter
|
||||
@ -172,12 +168,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
BlockNumber: block.Number().String(),
|
||||
submit: func(self *BatchTx, err error) error {
|
||||
tDiff := time.Since(t)
|
||||
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
||||
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
sdi.fileWriter.Flush()
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tPostgresCommit.Update(tDiff)
|
||||
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
log.Debug(traceMsg)
|
||||
@ -185,21 +181,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
},
|
||||
}
|
||||
tDiff := time.Since(t)
|
||||
indexerMetrics.tFreePostgres.Update(tDiff)
|
||||
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
// write header, collect headerID
|
||||
headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty)
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tHeaderProcessing.Update(tDiff)
|
||||
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
// write uncles
|
||||
sdi.processUncles(headerID, block.Number(), block.UncleHash(), block.Uncles())
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tUncleProcessing.Update(tDiff)
|
||||
metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
@ -217,7 +213,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
|
||||
metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
|
||||
@ -1,94 +0,0 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
namespace = "statediff"
|
||||
)
|
||||
|
||||
// Build a fully qualified metric name
|
||||
func metricName(subsystem, name string) string {
|
||||
if name == "" {
|
||||
return ""
|
||||
}
|
||||
parts := []string{namespace, name}
|
||||
if subsystem != "" {
|
||||
parts = []string{namespace, subsystem, name}
|
||||
}
|
||||
// Prometheus uses _ but geth metrics uses / and replaces
|
||||
return strings.Join(parts, "/")
|
||||
}
|
||||
|
||||
type indexerMetricsHandles struct {
|
||||
// The total number of processed blocks
|
||||
blocks metrics.Counter
|
||||
// The total number of processed transactions
|
||||
transactions metrics.Counter
|
||||
// The total number of processed receipts
|
||||
receipts metrics.Counter
|
||||
// The total number of processed logs
|
||||
logs metrics.Counter
|
||||
// The total number of access list entries processed
|
||||
accessListEntries metrics.Counter
|
||||
// Time spent waiting for free postgres tx
|
||||
tFreePostgres metrics.Timer
|
||||
// Postgres transaction commit duration
|
||||
tPostgresCommit metrics.Timer
|
||||
// Header processing time
|
||||
tHeaderProcessing metrics.Timer
|
||||
// Uncle processing time
|
||||
tUncleProcessing metrics.Timer
|
||||
// Tx and receipt processing time
|
||||
tTxAndRecProcessing metrics.Timer
|
||||
// State, storage, and code combined processing time
|
||||
tStateStoreCodeProcessing metrics.Timer
|
||||
}
|
||||
|
||||
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
||||
ctx := indexerMetricsHandles{
|
||||
blocks: metrics.NewCounter(),
|
||||
transactions: metrics.NewCounter(),
|
||||
receipts: metrics.NewCounter(),
|
||||
logs: metrics.NewCounter(),
|
||||
accessListEntries: metrics.NewCounter(),
|
||||
tFreePostgres: metrics.NewTimer(),
|
||||
tPostgresCommit: metrics.NewTimer(),
|
||||
tHeaderProcessing: metrics.NewTimer(),
|
||||
tUncleProcessing: metrics.NewTimer(),
|
||||
tTxAndRecProcessing: metrics.NewTimer(),
|
||||
tStateStoreCodeProcessing: metrics.NewTimer(),
|
||||
}
|
||||
subsys := "indexer"
|
||||
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
|
||||
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
|
||||
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
|
||||
reg.Register(metricName(subsys, "logs"), ctx.logs)
|
||||
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
|
||||
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
|
||||
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
|
||||
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
|
||||
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
|
||||
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
|
||||
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
|
||||
return ctx
|
||||
}
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/thoas/go-funk"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
@ -35,7 +36,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
|
||||
pipeSize = 65336 // min(linuxPipeSize, macOSPipeSize)
|
||||
writeBufferSize = pipeSize * 16 * 96
|
||||
)
|
||||
@ -191,7 +191,7 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
|
||||
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot,
|
||||
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase)
|
||||
sqw.stmts <- []byte(stmt)
|
||||
indexerMetrics.blocks.Inc(1)
|
||||
metrics.IndexerMetrics.BlocksCounter.Inc(1)
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
|
||||
@ -202,20 +202,20 @@ func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
|
||||
func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
|
||||
transaction.Src, transaction.Index, transaction.Type, transaction.Value))
|
||||
indexerMetrics.transactions.Inc(1)
|
||||
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
|
||||
rct.PostState, rct.PostStatus))
|
||||
indexerMetrics.receipts.Inc(1)
|
||||
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
|
||||
for _, l := range logs {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0,
|
||||
l.Topic1, l.Topic2, l.Topic3))
|
||||
indexerMetrics.logs.Inc(1)
|
||||
metrics.IndexerMetrics.LogsCounter.Inc(1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
263
statediff/indexer/database/metrics/metrics.go
Normal file
263
statediff/indexer/database/metrics/metrics.go
Normal file
@ -0,0 +1,263 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
namespace = "statediff"
|
||||
)
|
||||
|
||||
var (
|
||||
IndexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
||||
DBMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
|
||||
)
|
||||
|
||||
// Build a fully qualified metric name
|
||||
func metricName(subsystem, name string) string {
|
||||
if name == "" {
|
||||
return ""
|
||||
}
|
||||
parts := []string{namespace, name}
|
||||
if subsystem != "" {
|
||||
parts = []string{namespace, subsystem, name}
|
||||
}
|
||||
// Prometheus uses _ but geth metrics uses / and replaces
|
||||
return strings.Join(parts, "/")
|
||||
}
|
||||
|
||||
type IndexerMetricsHandles struct {
|
||||
// The total number of processed BlocksCounter
|
||||
BlocksCounter metrics.Counter
|
||||
// The total number of processed transactions
|
||||
TransactionsCounter metrics.Counter
|
||||
// The total number of processed receipts
|
||||
ReceiptsCounter metrics.Counter
|
||||
// The total number of processed logs
|
||||
LogsCounter metrics.Counter
|
||||
// The total number of access list entries processed
|
||||
AccessListEntriesCounter metrics.Counter
|
||||
// Time spent waiting for free postgres tx
|
||||
FreePostgresTimer metrics.Timer
|
||||
// Postgres transaction commit duration
|
||||
PostgresCommitTimer metrics.Timer
|
||||
// Header processing time
|
||||
HeaderProcessingTimer metrics.Timer
|
||||
// Uncle processing time
|
||||
UncleProcessingTimer metrics.Timer
|
||||
// Tx and receipt processing time
|
||||
TxAndRecProcessingTimer metrics.Timer
|
||||
// State, storage, and code combined processing time
|
||||
StateStoreCodeProcessingTimer metrics.Timer
|
||||
|
||||
// Fine-grained code timers
|
||||
BuildStateDiffWithIntermediateStateNodesTimer metrics.Timer
|
||||
BuildStateDiffWithoutIntermediateStateNodesTimer metrics.Timer
|
||||
CreatedAndUpdatedStateWithIntermediateNodesTimer metrics.Timer
|
||||
DeletedOrUpdatedStateTimer metrics.Timer
|
||||
BuildAccountUpdatesTimer metrics.Timer
|
||||
BuildAccountCreationsTimer metrics.Timer
|
||||
ResolveNodeTimer metrics.Timer
|
||||
SortKeysTimer metrics.Timer
|
||||
FindIntersectionTimer metrics.Timer
|
||||
OutputTimer metrics.Timer
|
||||
IPLDOutputTimer metrics.Timer
|
||||
DifferenceIteratorNextTimer metrics.Timer
|
||||
DifferenceIteratorCounter metrics.Counter
|
||||
DeletedOrUpdatedStorageTimer metrics.Timer
|
||||
CreatedAndUpdatedStorageTimer metrics.Timer
|
||||
BuildStorageNodesIncrementalTimer metrics.Timer
|
||||
BuildStateTrieObjectTimer metrics.Timer
|
||||
BuildStateTrieTimer metrics.Timer
|
||||
BuildStateDiffObjectTimer metrics.Timer
|
||||
WriteStateDiffObjectTimer metrics.Timer
|
||||
CreatedAndUpdatedStateTimer metrics.Timer
|
||||
BuildStorageNodesEventualTimer metrics.Timer
|
||||
BuildStorageNodesFromTrieTimer metrics.Timer
|
||||
BuildRemovedAccountStorageNodesTimer metrics.Timer
|
||||
BuildRemovedStorageNodesFromTrieTimer metrics.Timer
|
||||
IsWatchedAddressTimer metrics.Timer
|
||||
}
|
||||
|
||||
func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
|
||||
ctx := IndexerMetricsHandles{
|
||||
BlocksCounter: metrics.NewCounter(),
|
||||
TransactionsCounter: metrics.NewCounter(),
|
||||
ReceiptsCounter: metrics.NewCounter(),
|
||||
LogsCounter: metrics.NewCounter(),
|
||||
AccessListEntriesCounter: metrics.NewCounter(),
|
||||
FreePostgresTimer: metrics.NewTimer(),
|
||||
PostgresCommitTimer: metrics.NewTimer(),
|
||||
HeaderProcessingTimer: metrics.NewTimer(),
|
||||
UncleProcessingTimer: metrics.NewTimer(),
|
||||
TxAndRecProcessingTimer: metrics.NewTimer(),
|
||||
StateStoreCodeProcessingTimer: metrics.NewTimer(),
|
||||
BuildStateDiffWithIntermediateStateNodesTimer: metrics.NewTimer(),
|
||||
BuildStateDiffWithoutIntermediateStateNodesTimer: metrics.NewTimer(),
|
||||
CreatedAndUpdatedStateWithIntermediateNodesTimer: metrics.NewTimer(),
|
||||
DeletedOrUpdatedStateTimer: metrics.NewTimer(),
|
||||
BuildAccountUpdatesTimer: metrics.NewTimer(),
|
||||
BuildAccountCreationsTimer: metrics.NewTimer(),
|
||||
ResolveNodeTimer: metrics.NewTimer(),
|
||||
SortKeysTimer: metrics.NewTimer(),
|
||||
FindIntersectionTimer: metrics.NewTimer(),
|
||||
OutputTimer: metrics.NewTimer(),
|
||||
IPLDOutputTimer: metrics.NewTimer(),
|
||||
DifferenceIteratorNextTimer: metrics.NewTimer(),
|
||||
DifferenceIteratorCounter: metrics.NewCounter(),
|
||||
DeletedOrUpdatedStorageTimer: metrics.NewTimer(),
|
||||
CreatedAndUpdatedStorageTimer: metrics.NewTimer(),
|
||||
BuildStorageNodesIncrementalTimer: metrics.NewTimer(),
|
||||
BuildStateTrieObjectTimer: metrics.NewTimer(),
|
||||
BuildStateTrieTimer: metrics.NewTimer(),
|
||||
BuildStateDiffObjectTimer: metrics.NewTimer(),
|
||||
WriteStateDiffObjectTimer: metrics.NewTimer(),
|
||||
CreatedAndUpdatedStateTimer: metrics.NewTimer(),
|
||||
BuildStorageNodesEventualTimer: metrics.NewTimer(),
|
||||
BuildStorageNodesFromTrieTimer: metrics.NewTimer(),
|
||||
BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(),
|
||||
BuildRemovedStorageNodesFromTrieTimer: metrics.NewTimer(),
|
||||
IsWatchedAddressTimer: metrics.NewTimer(),
|
||||
}
|
||||
subsys := "indexer"
|
||||
reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter)
|
||||
reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter)
|
||||
reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter)
|
||||
reg.Register(metricName(subsys, "logs"), ctx.LogsCounter)
|
||||
reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter)
|
||||
reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer)
|
||||
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer)
|
||||
reg.Register(metricName(subsys, "t_header_processing"), ctx.HeaderProcessingTimer)
|
||||
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer)
|
||||
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer)
|
||||
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer)
|
||||
reg.Register(metricName(subsys, "t_build_statediff_with_intermediate_state_nodes"), ctx.BuildStateDiffWithIntermediateStateNodesTimer)
|
||||
reg.Register(metricName(subsys, "t_build_statediff_without_intermediate_state_nodes"), ctx.BuildStateDiffWithoutIntermediateStateNodesTimer)
|
||||
reg.Register(metricName(subsys, "t_created_and_update_state_with_intermediate_nodes"), ctx.CreatedAndUpdatedStateWithIntermediateNodesTimer)
|
||||
reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer)
|
||||
reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer)
|
||||
reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer)
|
||||
reg.Register(metricName(subsys, "t_resolve_node"), ctx.ResolveNodeTimer)
|
||||
reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer)
|
||||
reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer)
|
||||
reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer)
|
||||
reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer)
|
||||
reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer)
|
||||
reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter)
|
||||
reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer)
|
||||
reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer)
|
||||
reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer)
|
||||
reg.Register(metricName(subsys, "t_build_state_trie_object"), ctx.BuildStateTrieObjectTimer)
|
||||
reg.Register(metricName(subsys, "t_build_state_trie"), ctx.BuildStateTrieTimer)
|
||||
reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer)
|
||||
reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffObjectTimer)
|
||||
reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer)
|
||||
reg.Register(metricName(subsys, "t_build_storage_nodes_eventual"), ctx.BuildStorageNodesEventualTimer)
|
||||
reg.Register(metricName(subsys, "t_build_storage_nodes_from_trie"), ctx.BuildStorageNodesFromTrieTimer)
|
||||
reg.Register(metricName(subsys, "t_build_removed_accounts_storage_nodes"), ctx.BuildRemovedAccountStorageNodesTimer)
|
||||
reg.Register(metricName(subsys, "t_build_removed_storage_nodes_from_trie"), ctx.BuildRemovedStorageNodesFromTrieTimer)
|
||||
reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer)
|
||||
|
||||
log.Debug("Registering statediff indexer metrics.")
|
||||
return ctx
|
||||
}
|
||||
|
||||
type dbMetricsHandles struct {
|
||||
// Maximum number of open connections to the sql
|
||||
maxOpen metrics.Gauge
|
||||
// The number of established connections both in use and idle
|
||||
open metrics.Gauge
|
||||
// The number of connections currently in use
|
||||
inUse metrics.Gauge
|
||||
// The number of idle connections
|
||||
idle metrics.Gauge
|
||||
// The total number of connections waited for
|
||||
waitedFor metrics.Counter
|
||||
// The total time blocked waiting for a new connection
|
||||
blockedMilliseconds metrics.Counter
|
||||
// The total number of connections closed due to SetMaxIdleConns
|
||||
closedMaxIdle metrics.Counter
|
||||
// The total number of connections closed due to SetConnMaxLifetime
|
||||
closedMaxLifetime metrics.Counter
|
||||
}
|
||||
|
||||
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
|
||||
ctx := dbMetricsHandles{
|
||||
maxOpen: metrics.NewGauge(),
|
||||
open: metrics.NewGauge(),
|
||||
inUse: metrics.NewGauge(),
|
||||
idle: metrics.NewGauge(),
|
||||
waitedFor: metrics.NewCounter(),
|
||||
blockedMilliseconds: metrics.NewCounter(),
|
||||
closedMaxIdle: metrics.NewCounter(),
|
||||
closedMaxLifetime: metrics.NewCounter(),
|
||||
}
|
||||
subsys := "connections"
|
||||
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
|
||||
reg.Register(metricName(subsys, "open"), ctx.open)
|
||||
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
|
||||
reg.Register(metricName(subsys, "idle"), ctx.idle)
|
||||
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
|
||||
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
|
||||
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
|
||||
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
|
||||
|
||||
log.Debug("Registering statediff DB metrics.")
|
||||
return ctx
|
||||
}
|
||||
|
||||
// DbStats interface to accommodate different concrete sql stats types
|
||||
type DbStats interface {
|
||||
MaxOpen() int64
|
||||
Open() int64
|
||||
InUse() int64
|
||||
Idle() int64
|
||||
WaitCount() int64
|
||||
WaitDuration() time.Duration
|
||||
MaxIdleClosed() int64
|
||||
MaxLifetimeClosed() int64
|
||||
}
|
||||
|
||||
func (met *dbMetricsHandles) Update(stats DbStats) {
|
||||
met.maxOpen.Update(stats.MaxOpen())
|
||||
met.open.Update(stats.Open())
|
||||
met.inUse.Update(stats.InUse())
|
||||
met.idle.Update(stats.Idle())
|
||||
met.waitedFor.Inc(stats.WaitCount())
|
||||
met.blockedMilliseconds.Inc(stats.WaitDuration().Milliseconds())
|
||||
met.closedMaxIdle.Inc(stats.MaxIdleClosed())
|
||||
met.closedMaxLifetime.Inc(stats.MaxLifetimeClosed())
|
||||
}
|
||||
|
||||
func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) {
|
||||
since := UpdateDuration(start, timer)
|
||||
logger.Debug(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds()))
|
||||
}
|
||||
|
||||
func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration {
|
||||
since := time.Since(start)
|
||||
timer.Update(since)
|
||||
return since
|
||||
}
|
||||
@ -35,6 +35,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
@ -44,11 +45,6 @@ import (
|
||||
|
||||
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
||||
|
||||
var (
|
||||
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
||||
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
|
||||
)
|
||||
|
||||
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql
|
||||
type StateDiffIndexer struct {
|
||||
ctx context.Context
|
||||
@ -75,7 +71,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
dbMetrics.Update(sdi.dbWriter.db.Stats())
|
||||
metrics2.DBMetrics.Update(sdi.dbWriter.db.Stats())
|
||||
case <-quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
@ -156,7 +152,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
rollback(sdi.ctx, tx)
|
||||
} else {
|
||||
tDiff := time.Since(t)
|
||||
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
||||
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
if err := self.flush(); err != nil {
|
||||
@ -167,7 +163,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
}
|
||||
err = tx.Commit(sdi.ctx)
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tPostgresCommit.Update(tDiff)
|
||||
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||
}
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
@ -178,7 +174,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
go blockTx.cache()
|
||||
|
||||
tDiff := time.Since(t)
|
||||
indexerMetrics.tFreePostgres.Update(tDiff)
|
||||
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
|
||||
|
||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
@ -190,7 +186,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tHeaderProcessing.Update(tDiff)
|
||||
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
// Publish and index uncles
|
||||
@ -199,7 +195,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tUncleProcessing.Update(tDiff)
|
||||
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
// Publish and index receipts and txs
|
||||
@ -216,7 +212,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
|
||||
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
|
||||
@ -19,7 +19,8 @@ package sql
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
)
|
||||
|
||||
// Database interfaces required by the sql indexer
|
||||
@ -30,12 +31,13 @@ type Database interface {
|
||||
|
||||
// Driver interface has all the methods required by a driver implementation to support the sql indexer
|
||||
type Driver interface {
|
||||
UseCopyFrom() bool
|
||||
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
|
||||
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
|
||||
Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error
|
||||
Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error
|
||||
Begin(ctx context.Context) (Tx, error)
|
||||
Stats() Stats
|
||||
Stats() metrics.DbStats
|
||||
NodeID() string
|
||||
Context() context.Context
|
||||
io.Closer
|
||||
@ -52,12 +54,25 @@ type Statements interface {
|
||||
InsertStorageStm() string
|
||||
InsertIPLDStm() string
|
||||
InsertIPLDsStm() string
|
||||
|
||||
// Table/column descriptions for use with CopyFrom and similar commands.
|
||||
LogTableName() []string
|
||||
LogColumnNames() []string
|
||||
RctTableName() []string
|
||||
RctColumnNames() []string
|
||||
StateTableName() []string
|
||||
StateColumnNames() []string
|
||||
StorageTableName() []string
|
||||
StorageColumnNames() []string
|
||||
TxTableName() []string
|
||||
TxColumnNames() []string
|
||||
}
|
||||
|
||||
// Tx interface to accommodate different concrete SQL transaction types
|
||||
type Tx interface {
|
||||
QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow
|
||||
Exec(ctx context.Context, sql string, args ...interface{}) (Result, error)
|
||||
CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error)
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
}
|
||||
@ -71,15 +86,3 @@ type ScannableRow interface {
|
||||
type Result interface {
|
||||
RowsAffected() (int64, error)
|
||||
}
|
||||
|
||||
// Stats interface to accommodate different concrete sql stats types
|
||||
type Stats interface {
|
||||
MaxOpen() int64
|
||||
Open() int64
|
||||
InUse() int64
|
||||
Idle() int64
|
||||
WaitCount() int64
|
||||
WaitDuration() time.Duration
|
||||
MaxIdleClosed() int64
|
||||
MaxLifetimeClosed() int64
|
||||
}
|
||||
|
||||
@ -2,10 +2,16 @@ package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// Changing this to 1 would make sure only sequential COPYs were combined.
|
||||
const copyFromCheckLimit = 100
|
||||
|
||||
type DelayedTx struct {
|
||||
cache []cachedStmt
|
||||
cache []interface{}
|
||||
db Database
|
||||
}
|
||||
type cachedStmt struct {
|
||||
@ -13,6 +19,20 @@ type cachedStmt struct {
|
||||
args []interface{}
|
||||
}
|
||||
|
||||
type copyFrom struct {
|
||||
tableName []string
|
||||
columnNames []string
|
||||
rows [][]interface{}
|
||||
}
|
||||
|
||||
func (cf *copyFrom) appendRows(rows [][]interface{}) {
|
||||
cf.rows = append(cf.rows, rows...)
|
||||
}
|
||||
|
||||
func (cf *copyFrom) matches(tableName []string, columnNames []string) bool {
|
||||
return reflect.DeepEqual(cf.tableName, tableName) && reflect.DeepEqual(cf.columnNames, columnNames)
|
||||
}
|
||||
|
||||
func NewDelayedTx(db Database) *DelayedTx {
|
||||
return &DelayedTx{db: db}
|
||||
}
|
||||
@ -21,6 +41,28 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
|
||||
return tx.db.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
|
||||
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
|
||||
prevCopy, ok := tx.cache[pos].(*copyFrom)
|
||||
if ok && prevCopy.matches(tableName, columnNames) {
|
||||
return prevCopy, count
|
||||
}
|
||||
}
|
||||
return nil, -1
|
||||
}
|
||||
|
||||
func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
|
||||
if prevCopy, distance := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy {
|
||||
log.Trace("statediff lazy_tx : Appending to COPY", "table", tableName,
|
||||
"current", len(prevCopy.rows), "new", len(rows), "distance", distance)
|
||||
prevCopy.appendRows(rows)
|
||||
} else {
|
||||
tx.cache = append(tx.cache, ©From{tableName, columnNames, rows})
|
||||
}
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
|
||||
tx.cache = append(tx.cache, cachedStmt{sql, args})
|
||||
return nil, nil
|
||||
@ -39,10 +81,19 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
|
||||
rollback(ctx, base)
|
||||
}
|
||||
}()
|
||||
for _, stmt := range tx.cache {
|
||||
_, err := base.Exec(ctx, stmt.sql, stmt.args...)
|
||||
if err != nil {
|
||||
return err
|
||||
for _, item := range tx.cache {
|
||||
switch item := item.(type) {
|
||||
case *copyFrom:
|
||||
_, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
|
||||
if err != nil {
|
||||
log.Error("COPY error", "table", item.tableName, "err", err)
|
||||
return err
|
||||
}
|
||||
case cachedStmt:
|
||||
_, err := base.Exec(ctx, item.sql, item.args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.cache = nil
|
||||
|
||||
@ -1,147 +0,0 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package sql
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
namespace = "statediff"
|
||||
)
|
||||
|
||||
// Build a fully qualified metric name
|
||||
func metricName(subsystem, name string) string {
|
||||
if name == "" {
|
||||
return ""
|
||||
}
|
||||
parts := []string{namespace, name}
|
||||
if subsystem != "" {
|
||||
parts = []string{namespace, subsystem, name}
|
||||
}
|
||||
// Prometheus uses _ but geth metrics uses / and replaces
|
||||
return strings.Join(parts, "/")
|
||||
}
|
||||
|
||||
type indexerMetricsHandles struct {
|
||||
// The total number of processed blocks
|
||||
blocks metrics.Counter
|
||||
// The total number of processed transactions
|
||||
transactions metrics.Counter
|
||||
// The total number of processed receipts
|
||||
receipts metrics.Counter
|
||||
// The total number of processed logs
|
||||
logs metrics.Counter
|
||||
// The total number of access list entries processed
|
||||
accessListEntries metrics.Counter
|
||||
// Time spent waiting for free postgres tx
|
||||
tFreePostgres metrics.Timer
|
||||
// Postgres transaction commit duration
|
||||
tPostgresCommit metrics.Timer
|
||||
// Header processing time
|
||||
tHeaderProcessing metrics.Timer
|
||||
// Uncle processing time
|
||||
tUncleProcessing metrics.Timer
|
||||
// Tx and receipt processing time
|
||||
tTxAndRecProcessing metrics.Timer
|
||||
// State, storage, and code combined processing time
|
||||
tStateStoreCodeProcessing metrics.Timer
|
||||
}
|
||||
|
||||
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
||||
ctx := indexerMetricsHandles{
|
||||
blocks: metrics.NewCounter(),
|
||||
transactions: metrics.NewCounter(),
|
||||
receipts: metrics.NewCounter(),
|
||||
logs: metrics.NewCounter(),
|
||||
accessListEntries: metrics.NewCounter(),
|
||||
tFreePostgres: metrics.NewTimer(),
|
||||
tPostgresCommit: metrics.NewTimer(),
|
||||
tHeaderProcessing: metrics.NewTimer(),
|
||||
tUncleProcessing: metrics.NewTimer(),
|
||||
tTxAndRecProcessing: metrics.NewTimer(),
|
||||
tStateStoreCodeProcessing: metrics.NewTimer(),
|
||||
}
|
||||
subsys := "indexer"
|
||||
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
|
||||
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
|
||||
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
|
||||
reg.Register(metricName(subsys, "logs"), ctx.logs)
|
||||
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
|
||||
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
|
||||
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
|
||||
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
|
||||
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
|
||||
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
|
||||
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
|
||||
return ctx
|
||||
}
|
||||
|
||||
type dbMetricsHandles struct {
|
||||
// Maximum number of open connections to the sql
|
||||
maxOpen metrics.Gauge
|
||||
// The number of established connections both in use and idle
|
||||
open metrics.Gauge
|
||||
// The number of connections currently in use
|
||||
inUse metrics.Gauge
|
||||
// The number of idle connections
|
||||
idle metrics.Gauge
|
||||
// The total number of connections waited for
|
||||
waitedFor metrics.Counter
|
||||
// The total time blocked waiting for a new connection
|
||||
blockedMilliseconds metrics.Counter
|
||||
// The total number of connections closed due to SetMaxIdleConns
|
||||
closedMaxIdle metrics.Counter
|
||||
// The total number of connections closed due to SetConnMaxLifetime
|
||||
closedMaxLifetime metrics.Counter
|
||||
}
|
||||
|
||||
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
|
||||
ctx := dbMetricsHandles{
|
||||
maxOpen: metrics.NewGauge(),
|
||||
open: metrics.NewGauge(),
|
||||
inUse: metrics.NewGauge(),
|
||||
idle: metrics.NewGauge(),
|
||||
waitedFor: metrics.NewCounter(),
|
||||
blockedMilliseconds: metrics.NewCounter(),
|
||||
closedMaxIdle: metrics.NewCounter(),
|
||||
closedMaxLifetime: metrics.NewCounter(),
|
||||
}
|
||||
subsys := "connections"
|
||||
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
|
||||
reg.Register(metricName(subsys, "open"), ctx.open)
|
||||
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
|
||||
reg.Register(metricName(subsys, "idle"), ctx.idle)
|
||||
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
|
||||
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
|
||||
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
|
||||
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (met *dbMetricsHandles) Update(stats Stats) {
|
||||
met.maxOpen.Update(stats.MaxOpen())
|
||||
met.open.Update(stats.Open())
|
||||
met.inUse.Update(stats.InUse())
|
||||
met.idle.Update(stats.Idle())
|
||||
met.waitedFor.Inc(stats.WaitCount())
|
||||
met.blockedMilliseconds.Inc(stats.WaitDuration().Milliseconds())
|
||||
met.closedMaxIdle.Inc(stats.MaxIdleClosed())
|
||||
met.closedMaxLifetime.Inc(stats.MaxLifetimeClosed())
|
||||
}
|
||||
@ -28,7 +28,7 @@ import (
|
||||
)
|
||||
|
||||
func setupLegacyPGXIndexer(t *testing.T) {
|
||||
db, err = postgres.SetupPGXDB()
|
||||
db, err = postgres.SetupPGXDB(postgres.DefaultConfig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -29,8 +29,8 @@ import (
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/test"
|
||||
)
|
||||
|
||||
func setupPGXIndexer(t *testing.T) {
|
||||
db, err = postgres.SetupPGXDB()
|
||||
func setupPGXIndexer(t *testing.T, config postgres.Config) {
|
||||
db, err = postgres.SetupPGXDB(config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -39,12 +39,16 @@ func setupPGXIndexer(t *testing.T) {
|
||||
}
|
||||
|
||||
func setupPGX(t *testing.T) {
|
||||
setupPGXIndexer(t)
|
||||
setupPGXWithConfig(t, postgres.DefaultConfig)
|
||||
}
|
||||
|
||||
func setupPGXWithConfig(t *testing.T, config postgres.Config) {
|
||||
setupPGXIndexer(t, config)
|
||||
test.SetupTestData(t, ind)
|
||||
}
|
||||
|
||||
func setupPGXNonCanonical(t *testing.T) {
|
||||
setupPGXIndexer(t)
|
||||
setupPGXIndexer(t, postgres.DefaultConfig)
|
||||
test.SetupTestDataNonCanonical(t, ind)
|
||||
}
|
||||
|
||||
@ -97,6 +101,20 @@ func TestPGXIndexer(t *testing.T) {
|
||||
|
||||
test.TestPublishAndIndexStorageIPLDs(t, db)
|
||||
})
|
||||
|
||||
t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) {
|
||||
config := postgres.DefaultConfig
|
||||
config.CopyFrom = true
|
||||
|
||||
setupPGXWithConfig(t, config)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 1, 0, 1)
|
||||
|
||||
test.TestPublishAndIndexStateIPLDs(t, db)
|
||||
test.TestPublishAndIndexStorageIPLDs(t, db)
|
||||
test.TestPublishAndIndexReceiptIPLDs(t, db)
|
||||
test.TestPublishAndIndexLogIPLDs(t, db)
|
||||
})
|
||||
}
|
||||
|
||||
// Test indexer for a canonical + a non-canonical block at London height + a non-canonical block at London height + 1
|
||||
@ -151,7 +169,7 @@ func TestPGXIndexerNonCanonical(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPGXWatchAddressMethods(t *testing.T) {
|
||||
setupPGXIndexer(t)
|
||||
setupPGXIndexer(t, postgres.DefaultConfig)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 1, 0, 1)
|
||||
|
||||
|
||||
@ -92,6 +92,9 @@ type Config struct {
|
||||
|
||||
// toggle on/off upserts
|
||||
Upsert bool
|
||||
|
||||
// toggle on/off CopyFrom
|
||||
CopyFrom bool
|
||||
}
|
||||
|
||||
// Type satisfies interfaces.Config
|
||||
|
||||
@ -84,3 +84,43 @@ func (db *DB) InsertIPLDStm() string {
|
||||
func (db *DB) InsertIPLDsStm() string {
|
||||
return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING`
|
||||
}
|
||||
|
||||
func (db *DB) LogTableName() []string {
|
||||
return []string{"eth", "log_cids"}
|
||||
}
|
||||
|
||||
func (db *DB) LogColumnNames() []string {
|
||||
return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"}
|
||||
}
|
||||
|
||||
func (db *DB) RctTableName() []string {
|
||||
return []string{"eth", "receipt_cids"}
|
||||
}
|
||||
|
||||
func (db *DB) RctColumnNames() []string {
|
||||
return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"}
|
||||
}
|
||||
|
||||
func (db *DB) StateTableName() []string {
|
||||
return []string{"eth", "state_cids"}
|
||||
}
|
||||
|
||||
func (db *DB) StateColumnNames() []string {
|
||||
return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"}
|
||||
}
|
||||
|
||||
func (db *DB) StorageTableName() []string {
|
||||
return []string{"eth", "storage_cids"}
|
||||
}
|
||||
|
||||
func (db *DB) StorageColumnNames() []string {
|
||||
return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"}
|
||||
}
|
||||
|
||||
func (db *DB) TxTableName() []string {
|
||||
return []string{"eth", "transaction_cids"}
|
||||
}
|
||||
|
||||
func (db *DB) TxColumnNames() []string {
|
||||
return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"}
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
)
|
||||
@ -37,6 +38,7 @@ type PGXDriver struct {
|
||||
pool *pgxpool.Pool
|
||||
nodeInfo node.Info
|
||||
nodeID string
|
||||
config Config
|
||||
}
|
||||
|
||||
// ConnectPGX initializes and returns a PGX connection pool
|
||||
@ -55,7 +57,7 @@ func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDrive
|
||||
if err != nil {
|
||||
return nil, ErrDBConnectionFailed(err)
|
||||
}
|
||||
pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node}
|
||||
pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config}
|
||||
nodeErr := pg.createNode()
|
||||
if nodeErr != nil {
|
||||
return &PGXDriver{}, ErrUnableToSetNode(nodeErr)
|
||||
@ -144,7 +146,7 @@ func (pgx *PGXDriver) Begin(ctx context.Context) (sql.Tx, error) {
|
||||
return pgxTxWrapper{tx: tx}, nil
|
||||
}
|
||||
|
||||
func (pgx *PGXDriver) Stats() sql.Stats {
|
||||
func (pgx *PGXDriver) Stats() metrics.DbStats {
|
||||
stats := pgx.pool.Stat()
|
||||
return pgxStatsWrapper{stats: stats}
|
||||
}
|
||||
@ -165,6 +167,11 @@ func (pgx *PGXDriver) Context() context.Context {
|
||||
return pgx.ctx
|
||||
}
|
||||
|
||||
// HasCopy satisfies sql.Database
|
||||
func (pgx *PGXDriver) UseCopyFrom() bool {
|
||||
return pgx.config.CopyFrom
|
||||
}
|
||||
|
||||
type resultWrapper struct {
|
||||
ct pgconn.CommandTag
|
||||
}
|
||||
@ -178,43 +185,43 @@ type pgxStatsWrapper struct {
|
||||
stats *pgxpool.Stat
|
||||
}
|
||||
|
||||
// MaxOpen satisfies sql.Stats
|
||||
// MaxOpen satisfies metrics.DbStats
|
||||
func (s pgxStatsWrapper) MaxOpen() int64 {
|
||||
return int64(s.stats.MaxConns())
|
||||
}
|
||||
|
||||
// Open satisfies sql.Stats
|
||||
// Open satisfies metrics.DbStats
|
||||
func (s pgxStatsWrapper) Open() int64 {
|
||||
return int64(s.stats.TotalConns())
|
||||
}
|
||||
|
||||
// InUse satisfies sql.Stats
|
||||
// InUse satisfies metrics.DbStats
|
||||
func (s pgxStatsWrapper) InUse() int64 {
|
||||
return int64(s.stats.AcquiredConns())
|
||||
}
|
||||
|
||||
// Idle satisfies sql.Stats
|
||||
// Idle satisfies metrics.DbStats
|
||||
func (s pgxStatsWrapper) Idle() int64 {
|
||||
return int64(s.stats.IdleConns())
|
||||
}
|
||||
|
||||
// WaitCount satisfies sql.Stats
|
||||
// WaitCount satisfies metrics.DbStats
|
||||
func (s pgxStatsWrapper) WaitCount() int64 {
|
||||
return s.stats.EmptyAcquireCount()
|
||||
}
|
||||
|
||||
// WaitDuration satisfies sql.Stats
|
||||
// WaitDuration satisfies metrics.DbStats
|
||||
func (s pgxStatsWrapper) WaitDuration() time.Duration {
|
||||
return s.stats.AcquireDuration()
|
||||
}
|
||||
|
||||
// MaxIdleClosed satisfies sql.Stats
|
||||
// MaxIdleClosed satisfies metrics.DbStats
|
||||
func (s pgxStatsWrapper) MaxIdleClosed() int64 {
|
||||
// this stat isn't supported by pgxpool, but we don't want to panic
|
||||
return 0
|
||||
}
|
||||
|
||||
// MaxLifetimeClosed satisfies sql.Stats
|
||||
// MaxLifetimeClosed satisfies metrics.DbStats
|
||||
func (s pgxStatsWrapper) MaxLifetimeClosed() int64 {
|
||||
return s.stats.CanceledAcquireCount()
|
||||
}
|
||||
@ -243,3 +250,7 @@ func (t pgxTxWrapper) Commit(ctx context.Context) error {
|
||||
func (t pgxTxWrapper) Rollback(ctx context.Context) error {
|
||||
return t.tx.Rollback(ctx)
|
||||
}
|
||||
|
||||
func (t pgxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
|
||||
return t.tx.CopyFrom(ctx, tableName, columnNames, pgx.CopyFromRows(rows))
|
||||
}
|
||||
|
||||
@ -19,10 +19,12 @@ package postgres
|
||||
import (
|
||||
"context"
|
||||
coresql "database/sql"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
)
|
||||
@ -109,7 +111,7 @@ func (driver *SQLXDriver) Begin(_ context.Context) (sql.Tx, error) {
|
||||
return sqlxTxWrapper{tx: tx}, nil
|
||||
}
|
||||
|
||||
func (driver *SQLXDriver) Stats() sql.Stats {
|
||||
func (driver *SQLXDriver) Stats() metrics.DbStats {
|
||||
stats := driver.db.Stats()
|
||||
return sqlxStatsWrapper{stats: stats}
|
||||
}
|
||||
@ -129,46 +131,52 @@ func (driver *SQLXDriver) Context() context.Context {
|
||||
return driver.ctx
|
||||
}
|
||||
|
||||
// HasCopy satisfies sql.Database
|
||||
func (driver *SQLXDriver) UseCopyFrom() bool {
|
||||
// sqlx does not currently support COPY.
|
||||
return false
|
||||
}
|
||||
|
||||
type sqlxStatsWrapper struct {
|
||||
stats coresql.DBStats
|
||||
}
|
||||
|
||||
// MaxOpen satisfies sql.Stats
|
||||
// MaxOpen satisfies metrics.DbStats
|
||||
func (s sqlxStatsWrapper) MaxOpen() int64 {
|
||||
return int64(s.stats.MaxOpenConnections)
|
||||
}
|
||||
|
||||
// Open satisfies sql.Stats
|
||||
// Open satisfies metrics.DbStats
|
||||
func (s sqlxStatsWrapper) Open() int64 {
|
||||
return int64(s.stats.OpenConnections)
|
||||
}
|
||||
|
||||
// InUse satisfies sql.Stats
|
||||
// InUse satisfies metrics.DbStats
|
||||
func (s sqlxStatsWrapper) InUse() int64 {
|
||||
return int64(s.stats.InUse)
|
||||
}
|
||||
|
||||
// Idle satisfies sql.Stats
|
||||
// Idle satisfies metrics.DbStats
|
||||
func (s sqlxStatsWrapper) Idle() int64 {
|
||||
return int64(s.stats.Idle)
|
||||
}
|
||||
|
||||
// WaitCount satisfies sql.Stats
|
||||
// WaitCount satisfies metrics.DbStats
|
||||
func (s sqlxStatsWrapper) WaitCount() int64 {
|
||||
return s.stats.WaitCount
|
||||
}
|
||||
|
||||
// WaitDuration satisfies sql.Stats
|
||||
// WaitDuration satisfies metrics.DbStats
|
||||
func (s sqlxStatsWrapper) WaitDuration() time.Duration {
|
||||
return s.stats.WaitDuration
|
||||
}
|
||||
|
||||
// MaxIdleClosed satisfies sql.Stats
|
||||
// MaxIdleClosed satisfies metrics.DbStats
|
||||
func (s sqlxStatsWrapper) MaxIdleClosed() int64 {
|
||||
return s.stats.MaxIdleClosed
|
||||
}
|
||||
|
||||
// MaxLifetimeClosed satisfies sql.Stats
|
||||
// MaxLifetimeClosed satisfies metrics.DbStats
|
||||
func (s sqlxStatsWrapper) MaxLifetimeClosed() int64 {
|
||||
return s.stats.MaxLifetimeClosed
|
||||
}
|
||||
@ -196,3 +204,7 @@ func (t sqlxTxWrapper) Commit(ctx context.Context) error {
|
||||
func (t sqlxTxWrapper) Rollback(ctx context.Context) error {
|
||||
return t.tx.Rollback()
|
||||
}
|
||||
|
||||
func (t sqlxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
|
||||
return 0, errors.New("Unsupported Operation")
|
||||
}
|
||||
|
||||
@ -35,8 +35,8 @@ func SetupSQLXDB() (sql.Database, error) {
|
||||
}
|
||||
|
||||
// SetupPGXDB is used to setup a pgx db for tests
|
||||
func SetupPGXDB() (sql.Database, error) {
|
||||
driver, err := NewPGXDriver(context.Background(), DefaultConfig, node.Info{})
|
||||
func SetupPGXDB(config Config) (sql.Database, error) {
|
||||
driver, err := NewPGXDriver(context.Background(), config, node.Info{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -18,9 +18,11 @@ package sql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/lib/pq"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
)
|
||||
|
||||
@ -66,7 +68,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
|
||||
if err != nil {
|
||||
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
|
||||
}
|
||||
indexerMetrics.blocks.Inc(1)
|
||||
metrics.IndexerMetrics.BlocksCounter.Inc(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -94,20 +96,39 @@ INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, sr
|
||||
ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING
|
||||
*/
|
||||
func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
|
||||
transaction.BlockNumber,
|
||||
transaction.HeaderID,
|
||||
transaction.TxHash,
|
||||
transaction.CID,
|
||||
transaction.Dst,
|
||||
transaction.Src,
|
||||
transaction.Index,
|
||||
transaction.Type,
|
||||
transaction.Value)
|
||||
if err != nil {
|
||||
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
|
||||
if w.useCopyForTx(tx) {
|
||||
blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64)
|
||||
if err != nil {
|
||||
return insertError{"eth.transaction_cids", err, "COPY", transaction}
|
||||
}
|
||||
|
||||
value, err := strconv.ParseFloat(transaction.Value, 64)
|
||||
if err != nil {
|
||||
return insertError{"eth.transaction_cids", err, "COPY", transaction}
|
||||
}
|
||||
|
||||
_, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(),
|
||||
toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
|
||||
transaction.Src, transaction.Index, int(transaction.Type), value)))
|
||||
if err != nil {
|
||||
return insertError{"eth.transaction_cids", err, "COPY", transaction}
|
||||
}
|
||||
} else {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
|
||||
transaction.BlockNumber,
|
||||
transaction.HeaderID,
|
||||
transaction.TxHash,
|
||||
transaction.CID,
|
||||
transaction.Dst,
|
||||
transaction.Src,
|
||||
transaction.Index,
|
||||
transaction.Type,
|
||||
transaction.Value)
|
||||
if err != nil {
|
||||
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
|
||||
}
|
||||
}
|
||||
indexerMetrics.transactions.Inc(1)
|
||||
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -116,18 +137,32 @@ INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, cid, contract, pos
|
||||
ON CONFLICT (tx_id, header_id, block_number) DO NOTHING
|
||||
*/
|
||||
func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
|
||||
rct.BlockNumber,
|
||||
rct.HeaderID,
|
||||
rct.TxID,
|
||||
rct.CID,
|
||||
rct.Contract,
|
||||
rct.PostState,
|
||||
rct.PostStatus)
|
||||
if err != nil {
|
||||
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
|
||||
if w.useCopyForTx(tx) {
|
||||
blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64)
|
||||
if err != nil {
|
||||
return insertError{"eth.receipt_cids", err, "COPY", rct}
|
||||
}
|
||||
|
||||
_, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(),
|
||||
toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
|
||||
rct.PostState, int(rct.PostStatus))))
|
||||
if err != nil {
|
||||
return insertError{"eth.receipt_cids", err, "COPY", rct}
|
||||
}
|
||||
} else {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
|
||||
rct.BlockNumber,
|
||||
rct.HeaderID,
|
||||
rct.TxID,
|
||||
rct.CID,
|
||||
rct.Contract,
|
||||
rct.PostState,
|
||||
rct.PostStatus)
|
||||
if err != nil {
|
||||
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
|
||||
}
|
||||
}
|
||||
indexerMetrics.receipts.Inc(1)
|
||||
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -136,22 +171,42 @@ INSERT INTO eth.log_cids (block_number, header_id, cid, rct_id, address, index,
|
||||
ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING
|
||||
*/
|
||||
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
|
||||
for _, log := range logs {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
|
||||
log.BlockNumber,
|
||||
log.HeaderID,
|
||||
log.CID,
|
||||
log.ReceiptID,
|
||||
log.Address,
|
||||
log.Index,
|
||||
log.Topic0,
|
||||
log.Topic1,
|
||||
log.Topic2,
|
||||
log.Topic3)
|
||||
if err != nil {
|
||||
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
|
||||
if w.useCopyForTx(tx) {
|
||||
var rows [][]interface{}
|
||||
for _, log := range logs {
|
||||
blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64)
|
||||
if err != nil {
|
||||
return insertError{"eth.log_cids", err, "COPY", log}
|
||||
}
|
||||
|
||||
rows = append(rows, toRow(blockNum, log.HeaderID, log.CID, log.ReceiptID,
|
||||
log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3))
|
||||
}
|
||||
if nil != rows && len(rows) >= 0 {
|
||||
_, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows)
|
||||
if err != nil {
|
||||
return insertError{"eth.log_cids", err, "COPY", rows}
|
||||
}
|
||||
metrics.IndexerMetrics.LogsCounter.Inc(int64(len(rows)))
|
||||
}
|
||||
} else {
|
||||
for _, log := range logs {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
|
||||
log.BlockNumber,
|
||||
log.HeaderID,
|
||||
log.CID,
|
||||
log.ReceiptID,
|
||||
log.Address,
|
||||
log.Index,
|
||||
log.Topic0,
|
||||
log.Topic1,
|
||||
log.Topic2,
|
||||
log.Topic3)
|
||||
if err != nil {
|
||||
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
|
||||
}
|
||||
metrics.IndexerMetrics.LogsCounter.Inc(1)
|
||||
}
|
||||
indexerMetrics.logs.Inc(1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -165,20 +220,39 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
||||
if stateNode.Removed {
|
||||
balance = "0"
|
||||
}
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
|
||||
stateNode.BlockNumber,
|
||||
stateNode.HeaderID,
|
||||
stateNode.StateKey,
|
||||
stateNode.CID,
|
||||
true,
|
||||
balance,
|
||||
stateNode.Nonce,
|
||||
stateNode.CodeHash,
|
||||
stateNode.StorageRoot,
|
||||
stateNode.Removed,
|
||||
)
|
||||
if err != nil {
|
||||
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
|
||||
|
||||
if w.useCopyForTx(tx) {
|
||||
blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64)
|
||||
if err != nil {
|
||||
return insertError{"eth.state_cids", err, "COPY", stateNode}
|
||||
}
|
||||
balInt, err := strconv.ParseInt(balance, 10, 64)
|
||||
if err != nil {
|
||||
return insertError{"eth.state_cids", err, "COPY", stateNode}
|
||||
}
|
||||
|
||||
_, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(),
|
||||
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
||||
true, balInt, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
|
||||
if err != nil {
|
||||
return insertError{"eth.state_cids", err, "COPY", stateNode}
|
||||
}
|
||||
} else {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
|
||||
stateNode.BlockNumber,
|
||||
stateNode.HeaderID,
|
||||
stateNode.StateKey,
|
||||
stateNode.CID,
|
||||
true,
|
||||
balance,
|
||||
stateNode.Nonce,
|
||||
stateNode.CodeHash,
|
||||
stateNode.StorageRoot,
|
||||
stateNode.Removed,
|
||||
)
|
||||
if err != nil {
|
||||
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -188,22 +262,57 @@ INSERT INTO eth.storage_cids (block_number, header_id, state_leaf_key, storage_l
|
||||
ON CONFLICT (header_id, state_leaf_key, storage_leaf_key, block_number) DO NOTHING
|
||||
*/
|
||||
func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
|
||||
storageCID.BlockNumber,
|
||||
storageCID.HeaderID,
|
||||
storageCID.StateKey,
|
||||
storageCID.StorageKey,
|
||||
storageCID.CID,
|
||||
true,
|
||||
storageCID.Value,
|
||||
storageCID.Removed,
|
||||
)
|
||||
if err != nil {
|
||||
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
|
||||
if w.useCopyForTx(tx) {
|
||||
blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64)
|
||||
if err != nil {
|
||||
return insertError{"eth.storage_cids", err, "COPY", storageCID}
|
||||
}
|
||||
|
||||
_, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(),
|
||||
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
||||
true, storageCID.Value, storageCID.Removed)))
|
||||
if err != nil {
|
||||
return insertError{"eth.storage_cids", err, "COPY", storageCID}
|
||||
}
|
||||
} else {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
|
||||
storageCID.BlockNumber,
|
||||
storageCID.HeaderID,
|
||||
storageCID.StateKey,
|
||||
storageCID.StorageKey,
|
||||
storageCID.CID,
|
||||
true,
|
||||
storageCID.Value,
|
||||
storageCID.Removed,
|
||||
)
|
||||
if err != nil {
|
||||
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) useCopyForTx(tx Tx) bool {
|
||||
// Using COPY instead of INSERT only makes much sense if also using a DelayedTx, so that operations
|
||||
// can be collected over time and then all submitted within in a single TX.
|
||||
if _, ok := tx.(*DelayedTx); ok {
|
||||
return w.db.UseCopyFrom()
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// combine args into a row
|
||||
func toRow(args ...interface{}) []interface{} {
|
||||
var row []interface{}
|
||||
row = append(row, args...)
|
||||
return row
|
||||
}
|
||||
|
||||
// combine row (or rows) into a slice of rows for CopyFrom
|
||||
func toRows(rows ...[]interface{}) [][]interface{} {
|
||||
return rows
|
||||
}
|
||||
|
||||
type insertError struct {
|
||||
table string
|
||||
err error
|
||||
|
||||
@ -26,8 +26,6 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
@ -43,9 +41,11 @@ import (
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
types2 "github.com/ethereum/go-ethereum/statediff/types"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
"github.com/thoas/go-funk"
|
||||
)
|
||||
|
||||
@ -794,15 +794,23 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
||||
}
|
||||
|
||||
output := func(node types2.StateLeafNode) error {
|
||||
defer func() {
|
||||
// This is very noisy so we log at Trace.
|
||||
since := metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
|
||||
logger.Trace(fmt.Sprintf("statediff output duration=%dms", since.Milliseconds()))
|
||||
}()
|
||||
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
||||
}
|
||||
ipldOutput := func(c types2.IPLD) error {
|
||||
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger, metrics.IndexerMetrics.IPLDOutputTimer)
|
||||
return sds.indexer.PushIPLD(tx, c)
|
||||
}
|
||||
|
||||
err = sds.Builder.WriteStateDiffObject(types2.StateRoots{
|
||||
err = sds.Builder.WriteStateDiffObject(Args{
|
||||
NewStateRoot: block.Root(),
|
||||
OldStateRoot: parentRoot,
|
||||
BlockHash: block.Hash(),
|
||||
BlockNumber: block.Number(),
|
||||
}, params, output, ipldOutput)
|
||||
// TODO this anti-pattern needs to be sorted out eventually
|
||||
if err := tx.Submit(err); err != nil {
|
||||
|
||||
@ -16,7 +16,6 @@
|
||||
|
||||
package statediff_test
|
||||
|
||||
/*
|
||||
import (
|
||||
"bytes"
|
||||
"math/big"
|
||||
@ -438,4 +437,3 @@ func testGetSyncStatus(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
@ -44,8 +44,8 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi
|
||||
}
|
||||
|
||||
// BuildStateDiffObject mock method
|
||||
func (builder *Builder) WriteStateDiffObject(args sdtypes.StateRoots, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
|
||||
builder.StateRoots = args
|
||||
func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
|
||||
builder.StateRoots = sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot}
|
||||
builder.Params = params
|
||||
|
||||
return builder.builderError
|
||||
|
||||
@ -22,12 +22,16 @@ package trie_helpers
|
||||
import (
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
// SortKeys sorts the keys in the account map
|
||||
func SortKeys(data types.AccountMap) []string {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.SortKeysTimer)
|
||||
keys := make([]string, 0, len(data))
|
||||
for key := range data {
|
||||
keys = append(keys, key)
|
||||
@ -41,6 +45,7 @@ func SortKeys(data types.AccountMap) []string {
|
||||
// a and b must first be sorted
|
||||
// this is used to find which keys have been both "deleted" and "created" i.e. they were updated
|
||||
func FindIntersection(a, b []string) []string {
|
||||
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.FindIntersectionTimer)
|
||||
lenA := len(a)
|
||||
lenB := len(b)
|
||||
iOfA, iOfB := 0, 0
|
||||
|
||||
@ -20,6 +20,9 @@ import (
|
||||
"bytes"
|
||||
"container/heap"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
@ -584,6 +587,7 @@ func (it *differenceIterator) AddResolver(resolver ethdb.KeyValueReader) {
|
||||
}
|
||||
|
||||
func (it *differenceIterator) Next(bool) bool {
|
||||
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DifferenceIteratorNextTimer)
|
||||
// Invariants:
|
||||
// - We always advance at least one element in b.
|
||||
// - At the start of this function, a's path is lexically greater than b's.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user