Fix concurrency and tests #4
@ -404,12 +404,18 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer tx.RollbackOnFailure(err)
|
|
||||||
// defer handling of commit/rollback for any return case
|
// defer handling of commit/rollback for any return case
|
||||||
|
defer tx.RollbackOnFailure(err)
|
||||||
|
|
||||||
|
var nodeMtx, ipldMtx sync.Mutex
|
||||||
output := func(node sdtypes.StateLeafNode) error {
|
output := func(node sdtypes.StateLeafNode) error {
|
||||||
|
nodeMtx.Lock()
|
||||||
|
defer nodeMtx.Unlock()
|
||||||
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
||||||
}
|
}
|
||||||
codeOutput := func(c sdtypes.IPLD) error {
|
ipldOutput := func(c sdtypes.IPLD) error {
|
||||||
|
ipldMtx.Lock()
|
||||||
|
defer ipldMtx.Unlock()
|
||||||
return sds.indexer.PushIPLD(tx, c)
|
return sds.indexer.PushIPLD(tx, c)
|
||||||
}
|
}
|
||||||
prom.SetTimeMetric(prom.T_BLOCK_PROCESSING, time.Now().Sub(t))
|
prom.SetTimeMetric(prom.T_BLOCK_PROCESSING, time.Now().Sub(t))
|
||||||
@ -419,7 +425,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
OldStateRoot: parentRoot,
|
OldStateRoot: parentRoot,
|
||||||
BlockNumber: block.Number(),
|
BlockNumber: block.Number(),
|
||||||
BlockHash: block.Hash(),
|
BlockHash: block.Hash(),
|
||||||
}, params, output, codeOutput)
|
}, params, output, ipldOutput)
|
||||||
prom.SetTimeMetric(prom.T_STATE_PROCESSING, time.Now().Sub(t))
|
prom.SetTimeMetric(prom.T_STATE_PROCESSING, time.Now().Sub(t))
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
err = tx.Submit()
|
err = tx.Submit()
|
||||||
|
Loading…
Reference in New Issue
Block a user