Port: Fix deferred rollback on error
This commit is contained in:
parent
83b0c11b25
commit
b6fc15dd81
@ -84,13 +84,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
|
||||
for _, item := range tx.cache {
|
||||
switch item := item.(type) {
|
||||
case *copyFrom:
|
||||
_, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
|
||||
_, err = base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
|
||||
if err != nil {
|
||||
log.Error("COPY error", "table", item.tableName, "error", err)
|
||||
return err
|
||||
}
|
||||
case cachedStmt:
|
||||
_, err := base.Exec(ctx, item.sql, item.args...)
|
||||
_, err = base.Exec(ctx, item.sql, item.args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -38,19 +38,20 @@ func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) {
|
||||
return start, logger
|
||||
}
|
||||
|
||||
func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Duration {
|
||||
func countStateDiffEnd(start time.Time, logger log.Logger, err *error) time.Duration {
|
||||
duration := time.Since(start)
|
||||
defaultStatediffMetrics.underway.Dec(1)
|
||||
if nil == err {
|
||||
defaultStatediffMetrics.succeeded.Inc(1)
|
||||
} else {
|
||||
failed := nil != err && nil != *err
|
||||
if failed {
|
||||
defaultStatediffMetrics.failed.Inc(1)
|
||||
} else {
|
||||
defaultStatediffMetrics.succeeded.Inc(1)
|
||||
}
|
||||
defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds())
|
||||
|
||||
logger.Debug("writeStateDiff END",
|
||||
"duration", duration,
|
||||
"error", err != nil,
|
||||
"error", failed,
|
||||
"underway", defaultStatediffMetrics.underway.Count(),
|
||||
"succeeded", defaultStatediffMetrics.succeeded.Count(),
|
||||
"failed", defaultStatediffMetrics.failed.Count(),
|
||||
|
@ -681,7 +681,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
||||
var err error
|
||||
var tx interfaces.Batch
|
||||
start, logger := countStateDiffBegin(block)
|
||||
defer countStateDiffEnd(start, logger, err)
|
||||
defer countStateDiffEnd(start, logger, &err)
|
||||
if sds.indexer == nil {
|
||||
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
|
||||
}
|
||||
@ -716,7 +716,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
||||
}, params, output, ipldOutput)
|
||||
|
||||
// TODO this anti-pattern needs to be sorted out eventually
|
||||
if err := tx.Submit(err); err != nil {
|
||||
if err = tx.Submit(err); err != nil {
|
||||
return fmt.Errorf("batch transaction submission failed: %w", err)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user