Port: Fix deferred rollback on error

This commit is contained in:
Roy Crihfield 2023-06-21 21:32:07 +08:00
parent ebc2eb37e7
commit 49e0beb389
3 changed files with 10 additions and 9 deletions

View File

@ -84,13 +84,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
for _, item := range tx.cache { for _, item := range tx.cache {
switch item := item.(type) { switch item := item.(type) {
case *copyFrom: case *copyFrom:
_, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) _, err = base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
if err != nil { if err != nil {
log.Error("COPY error", "table", item.tableName, "error", err) log.Error("COPY error", "table", item.tableName, "error", err)
return err return err
} }
case cachedStmt: case cachedStmt:
_, err := base.Exec(ctx, item.sql, item.args...) _, err = base.Exec(ctx, item.sql, item.args...)
if err != nil { if err != nil {
return err return err
} }

View File

@ -38,19 +38,20 @@ func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) {
return start, logger return start, logger
} }
func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Duration { func countStateDiffEnd(start time.Time, logger log.Logger, err *error) time.Duration {
duration := time.Since(start) duration := time.Since(start)
defaultStatediffMetrics.underway.Dec(1) defaultStatediffMetrics.underway.Dec(1)
if nil == err { failed := nil != err && nil != *err
defaultStatediffMetrics.succeeded.Inc(1) if failed {
} else {
defaultStatediffMetrics.failed.Inc(1) defaultStatediffMetrics.failed.Inc(1)
} else {
defaultStatediffMetrics.succeeded.Inc(1)
} }
defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds()) defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds())
logger.Debug("writeStateDiff END", logger.Debug("writeStateDiff END",
"duration", duration, "duration", duration,
"error", err != nil, "error", failed,
"underway", defaultStatediffMetrics.underway.Count(), "underway", defaultStatediffMetrics.underway.Count(),
"succeeded", defaultStatediffMetrics.succeeded.Count(), "succeeded", defaultStatediffMetrics.succeeded.Count(),
"failed", defaultStatediffMetrics.failed.Count(), "failed", defaultStatediffMetrics.failed.Count(),

View File

@ -681,7 +681,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
var err error var err error
var tx interfaces.Batch var tx interfaces.Batch
start, logger := countStateDiffBegin(block) start, logger := countStateDiffBegin(block)
defer countStateDiffEnd(start, logger, err) defer countStateDiffEnd(start, logger, &err)
if sds.indexer == nil { if sds.indexer == nil {
return fmt.Errorf("indexer is not set; cannot write indexed diffs") return fmt.Errorf("indexer is not set; cannot write indexed diffs")
} }
@ -716,7 +716,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
}, params, output, ipldOutput) }, params, output, ipldOutput)
// TODO this anti-pattern needs to be sorted out eventually // TODO this anti-pattern needs to be sorted out eventually
if err := tx.Submit(err); err != nil { if err = tx.Submit(err); err != nil {
return fmt.Errorf("batch transaction submission failed: %w", err) return fmt.Errorf("batch transaction submission failed: %w", err)
} }