Roy Crihfield
b8fec4b571
Adds a method to perform full-state snapshots by diffing against an empty state trie. This replicates the functionality of `ipld-eth-state-snapshot`, so that code can use this as a library; see: cerc-io/ipld-eth-state-snapshot#1 Note that due to how incremental diffs are processed (updates are processed after the trie has been traversed) the iterator state doesn't fully capture the progress of the diff, so it's not currently feasible to state diffs this way. Full snapshots don't have to worry about updated accounts, so we can support them. Co-authored-by: Thomas E Lackey <telackey@bozemanpass.com> Reviewed-on: #15
111 lines
2.8 KiB
Go
111 lines
2.8 KiB
Go
package sql
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
|
|
"github.com/cerc-io/plugeth-statediff/utils/log"
|
|
)
|
|
|
|
// Changing this to 1 would make sure only sequential COPYs were combined.
|
|
const copyFromCheckLimit = 100
|
|
|
|
type DelayedTx struct {
|
|
cache []interface{}
|
|
db Database
|
|
}
|
|
type cachedStmt struct {
|
|
sql string
|
|
args []interface{}
|
|
}
|
|
|
|
type copyFrom struct {
|
|
tableName []string
|
|
columnNames []string
|
|
rows [][]interface{}
|
|
}
|
|
|
|
func (cf *copyFrom) appendRows(rows [][]interface{}) {
|
|
cf.rows = append(cf.rows, rows...)
|
|
}
|
|
|
|
func (cf *copyFrom) matches(tableName []string, columnNames []string) bool {
|
|
return reflect.DeepEqual(cf.tableName, tableName) && reflect.DeepEqual(cf.columnNames, columnNames)
|
|
}
|
|
|
|
func NewDelayedTx(db Database) *DelayedTx {
|
|
return &DelayedTx{db: db}
|
|
}
|
|
|
|
func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow {
|
|
return tx.db.QueryRow(ctx, sql, args...)
|
|
}
|
|
|
|
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
|
|
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
|
|
prevCopy, ok := tx.cache[pos].(*copyFrom)
|
|
if ok && prevCopy.matches(tableName, columnNames) {
|
|
return prevCopy, count
|
|
}
|
|
}
|
|
return nil, -1
|
|
}
|
|
|
|
func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) {
|
|
if prevCopy, distance := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy {
|
|
log.Trace("statediff lazy_tx : Appending to COPY", "table", tableName,
|
|
"current", len(prevCopy.rows), "new", len(rows), "distance", distance)
|
|
prevCopy.appendRows(rows)
|
|
} else {
|
|
tx.cache = append(tx.cache, ©From{tableName, columnNames, rows})
|
|
}
|
|
|
|
return 0, nil
|
|
}
|
|
|
|
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
|
|
tx.cache = append(tx.cache, cachedStmt{sql, args})
|
|
return nil, nil
|
|
}
|
|
|
|
func (tx *DelayedTx) Commit(ctx context.Context) error {
|
|
t := time.Now()
|
|
base, err := tx.db.Begin(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
metrics.IndexerMetrics.FreePostgresTimer.Update(time.Since(t))
|
|
defer func() {
|
|
if p := recover(); p != nil {
|
|
rollback(ctx, base)
|
|
panic(p)
|
|
} else if err != nil {
|
|
rollback(ctx, base)
|
|
}
|
|
}()
|
|
for _, item := range tx.cache {
|
|
switch item := item.(type) {
|
|
case *copyFrom:
|
|
_, err = base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
|
|
if err != nil {
|
|
log.Error("COPY error", "table", item.tableName, "error", err)
|
|
return err
|
|
}
|
|
case cachedStmt:
|
|
_, err = base.Exec(ctx, item.sql, item.args...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
tx.cache = nil
|
|
return base.Commit(ctx)
|
|
}
|
|
|
|
func (tx *DelayedTx) Rollback(ctx context.Context) error {
|
|
tx.cache = nil
|
|
return nil
|
|
}
|