367: Fix deferred rollback on error. #391
@ -84,13 +84,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
|
|||||||
for _, item := range tx.cache {
|
for _, item := range tx.cache {
|
||||||
switch item := item.(type) {
|
switch item := item.(type) {
|
||||||
case *copyFrom:
|
case *copyFrom:
|
||||||
_, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
|
_, err = base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("COPY error", "table", item.tableName, "err", err)
|
log.Error("COPY error", "table", item.tableName, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case cachedStmt:
|
case cachedStmt:
|
||||||
_, err := base.Exec(ctx, item.sql, item.args...)
|
_, err = base.Exec(ctx, item.sql, item.args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -39,18 +39,19 @@ func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) {
|
|||||||
return start, logger
|
return start, logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Duration {
|
func countStateDiffEnd(start time.Time, logger log.Logger, err *error) time.Duration {
|
||||||
duration := time.Since(start)
|
duration := time.Since(start)
|
||||||
defaultStatediffMetrics.underway.Dec(1)
|
defaultStatediffMetrics.underway.Dec(1)
|
||||||
if nil == err {
|
failed := nil != err && nil != *err
|
||||||
defaultStatediffMetrics.succeeded.Inc(1)
|
if failed {
|
||||||
} else {
|
|
||||||
defaultStatediffMetrics.failed.Inc(1)
|
defaultStatediffMetrics.failed.Inc(1)
|
||||||
|
} else {
|
||||||
|
defaultStatediffMetrics.succeeded.Inc(1)
|
||||||
}
|
}
|
||||||
defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds())
|
defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds())
|
||||||
|
|
||||||
logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
|
logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
|
||||||
duration.Milliseconds(), nil != err,
|
duration.Milliseconds(), failed,
|
||||||
defaultStatediffMetrics.underway.Count(),
|
defaultStatediffMetrics.underway.Count(),
|
||||||
defaultStatediffMetrics.succeeded.Count(),
|
defaultStatediffMetrics.succeeded.Count(),
|
||||||
defaultStatediffMetrics.failed.Count(),
|
defaultStatediffMetrics.failed.Count(),
|
||||||
|
@ -815,7 +815,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
var err error
|
var err error
|
||||||
var tx interfaces.Batch
|
var tx interfaces.Batch
|
||||||
start, logger := countStateDiffBegin(block)
|
start, logger := countStateDiffBegin(block)
|
||||||
defer countStateDiffEnd(start, logger, err)
|
defer countStateDiffEnd(start, logger, &err)
|
||||||
if params.IncludeTD {
|
if params.IncludeTD {
|
||||||
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
|
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
|
||||||
}
|
}
|
||||||
@ -847,7 +847,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
BlockNumber: block.Number(),
|
BlockNumber: block.Number(),
|
||||||
}, params, output, ipldOutput)
|
}, params, output, ipldOutput)
|
||||||
// TODO this anti-pattern needs to be sorted out eventually
|
// TODO this anti-pattern needs to be sorted out eventually
|
||||||
if err := tx.Submit(err); err != nil {
|
if err = tx.Submit(err); err != nil {
|
||||||
return fmt.Errorf("batch transaction submission failed: %w", err)
|
return fmt.Errorf("batch transaction submission failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user