fix a potential race with chain reorgs notifees.
This commit is contained in:
parent
9fbc7aafb4
commit
4e9293ba04
@ -48,6 +48,9 @@ var log = logging.Logger("chainstore")
|
||||
|
||||
var chainHeadKey = dstore.NewKey("head")
|
||||
|
||||
// ReorgNotifee represents a callback that gets called upon reorgs.
|
||||
type ReorgNotifee func(rev, app []*types.TipSet) error
|
||||
|
||||
type ChainStore struct {
|
||||
bs bstore.Blockstore
|
||||
ds dstore.Datastore
|
||||
@ -63,8 +66,8 @@ type ChainStore struct {
|
||||
|
||||
cindex *ChainIndex
|
||||
|
||||
reorgCh chan<- reorg
|
||||
headChangeNotifs []func(rev, app []*types.TipSet) error
|
||||
reorgCh chan<- reorg
|
||||
reorgNotifeeCh chan ReorgNotifee
|
||||
|
||||
mmCache *lru.ARCCache
|
||||
tsCache *lru.ARCCache
|
||||
@ -89,8 +92,6 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Sys
|
||||
|
||||
cs.cindex = ci
|
||||
|
||||
cs.reorgCh = cs.reorgWorker(context.TODO())
|
||||
|
||||
hcnf := func(rev, app []*types.TipSet) error {
|
||||
cs.pubLk.Lock()
|
||||
defer cs.pubLk.Unlock()
|
||||
@ -122,7 +123,8 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Sys
|
||||
return nil
|
||||
}
|
||||
|
||||
cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf, hcmetric)
|
||||
cs.reorgNotifeeCh = make(chan ReorgNotifee)
|
||||
cs.reorgCh = cs.reorgWorker(context.TODO(), []ReorgNotifee{hcnf, hcmetric})
|
||||
|
||||
return cs
|
||||
}
|
||||
@ -211,8 +213,8 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*api.HeadChange
|
||||
return out
|
||||
}
|
||||
|
||||
func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) {
|
||||
cs.headChangeNotifs = append(cs.headChangeNotifs, f)
|
||||
func (cs *ChainStore) SubscribeHeadChanges(f ReorgNotifee) {
|
||||
cs.reorgNotifeeCh <- f
|
||||
}
|
||||
|
||||
func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error {
|
||||
@ -273,13 +275,19 @@ type reorg struct {
|
||||
new *types.TipSet
|
||||
}
|
||||
|
||||
func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg {
|
||||
func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNotifee) chan<- reorg {
|
||||
out := make(chan reorg, 32)
|
||||
notifees := make([]ReorgNotifee, len(initialNotifees))
|
||||
copy(notifees, initialNotifees)
|
||||
|
||||
go func() {
|
||||
defer log.Warn("reorgWorker quit")
|
||||
|
||||
for {
|
||||
select {
|
||||
case n := <-cs.reorgNotifeeCh:
|
||||
notifees = append(notifees, n)
|
||||
|
||||
case r := <-out:
|
||||
revert, apply, err := cs.ReorgOps(r.old, r.new)
|
||||
if err != nil {
|
||||
@ -293,7 +301,7 @@ func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg {
|
||||
apply[i], apply[opp] = apply[opp], apply[i]
|
||||
}
|
||||
|
||||
for _, hcf := range cs.headChangeNotifs {
|
||||
for _, hcf := range notifees {
|
||||
if err := hcf(revert, apply); err != nil {
|
||||
log.Error("head change func errored (BAD): ", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user