sql indexer ipld cache - confirm quit
prevents negative waitgroup panic
This commit is contained in:
parent
8fa061ded5
commit
3d403176c7
@ -36,7 +36,7 @@ type BatchTx struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
dbtx Tx
|
dbtx Tx
|
||||||
stm string
|
stm string
|
||||||
quit chan struct{}
|
quit chan (chan<- struct{})
|
||||||
iplds chan models.IPLDModel
|
iplds chan models.IPLDModel
|
||||||
ipldCache models.IPLDBatch
|
ipldCache models.IPLDBatch
|
||||||
removedCacheFlag *uint32
|
removedCacheFlag *uint32
|
||||||
@ -81,8 +81,9 @@ func (tx *BatchTx) cache() {
|
|||||||
tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key)
|
tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key)
|
||||||
tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data)
|
tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data)
|
||||||
tx.cacheWg.Done()
|
tx.cacheWg.Done()
|
||||||
case <-tx.quit:
|
case confirm := <-tx.quit:
|
||||||
tx.ipldCache = models.IPLDBatch{}
|
tx.ipldCache = models.IPLDBatch{}
|
||||||
|
confirm <- struct{}{}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -129,7 +129,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
BlockNumber: block.Number().String(),
|
BlockNumber: block.Number().String(),
|
||||||
stm: sdi.dbWriter.db.InsertIPLDsStm(),
|
stm: sdi.dbWriter.db.InsertIPLDsStm(),
|
||||||
iplds: make(chan models.IPLDModel),
|
iplds: make(chan models.IPLDModel),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan (chan<- struct{})),
|
||||||
ipldCache: models.IPLDBatch{
|
ipldCache: models.IPLDBatch{
|
||||||
BlockNumbers: make([]string, 0, startingCacheCapacity),
|
BlockNumbers: make([]string, 0, startingCacheCapacity),
|
||||||
Keys: make([]string, 0, startingCacheCapacity),
|
Keys: make([]string, 0, startingCacheCapacity),
|
||||||
@ -139,7 +139,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
// handle transaction commit or rollback for any return case
|
// handle transaction commit or rollback for any return case
|
||||||
submit: func(self *BatchTx, err error) error {
|
submit: func(self *BatchTx, err error) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
confirm := make(chan struct{})
|
||||||
|
self.quit <- confirm
|
||||||
close(self.quit)
|
close(self.quit)
|
||||||
|
<-confirm
|
||||||
close(self.iplds)
|
close(self.iplds)
|
||||||
}()
|
}()
|
||||||
if p := recover(); p != nil {
|
if p := recover(); p != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user