diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index e13300306..a8eacefcd 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -7,6 +7,9 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// Changing this to 1 would make sure only sequential COPYs were combined. +const COPY_FROM_CHECK_LIMIT = 100 + type DelayedTx struct { cache []interface{} db Database @@ -22,6 +25,14 @@ type copyFrom struct { rows [][]interface{} } +func (cf *copyFrom) appendRows(rows [][]interface{}) { + cf.rows = append(cf.rows, rows...) +} + +func (cf *copyFrom) matches(tableName []string, columnNames []string) bool { + return reflect.DeepEqual(cf.tableName, tableName) && reflect.DeepEqual(cf.columnNames, columnNames) +} + func NewDelayedTx(db Database) *DelayedTx { return &DelayedTx{db: db} } @@ -30,20 +41,23 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface return tx.db.QueryRow(ctx, sql, args...) } -func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { - appendedToExisting := false - if len(tx.cache) > 0 { - prevCopy, ok := tx.cache[len(tx.cache)-1].(copyFrom) - if ok && reflect.DeepEqual(prevCopy.tableName, tableName) && reflect.DeepEqual(prevCopy.columnNames, columnNames) { - log.Info("statediff lazy_tx : Appending rows to COPY", "table", tableName, - "current", len(prevCopy.rows), "append", len(rows)) - prevCopy.rows = append(prevCopy.rows, rows...) - appendedToExisting = true +func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) *copyFrom { + for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 { + prevCopy, ok := tx.cache[pos].(*copyFrom) + if ok && prevCopy.matches(tableName, columnNames) { + return prevCopy } } + return nil +} - if !appendedToExisting { - tx.cache = append(tx.cache, copyFrom{tableName, columnNames, rows}) +func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + if prevCopy := tx.findPrevCopyFrom(tableName, columnNames, COPY_FROM_CHECK_LIMIT); nil != prevCopy { + log.Info("statediff lazy_tx : Appending rows to COPY", "table", tableName, + "current", len(prevCopy.rows), "append", len(rows)) + prevCopy.appendRows(rows) + } else { + tx.cache = append(tx.cache, ©From{tableName, columnNames, rows}) } return 0, nil @@ -69,8 +83,8 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { }() for _, item := range tx.cache { switch item.(type) { - case copyFrom: - copy := item.(copyFrom) + case *copyFrom: + copy := item.(*copyFrom) log.Info("statediff lazy_tx : COPY", "table", copy.tableName, "rows", len(copy.rows)) _, err := base.CopyFrom(ctx, copy.tableName, copy.columnNames, copy.rows) if err != nil {