diff --git a/chain/store/store.go b/chain/store/store.go index dba5995de..f0fb2a611 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -48,6 +48,9 @@ var log = logging.Logger("chainstore") var chainHeadKey = dstore.NewKey("head") +// ReorgNotifee represents a callback that gets called upon reorgs. +type ReorgNotifee func(rev, app []*types.TipSet) error + type ChainStore struct { bs bstore.Blockstore ds dstore.Datastore @@ -63,8 +66,8 @@ type ChainStore struct { cindex *ChainIndex - reorgCh chan<- reorg - headChangeNotifs []func(rev, app []*types.TipSet) error + reorgCh chan<- reorg + reorgNotifeeCh chan ReorgNotifee mmCache *lru.ARCCache tsCache *lru.ARCCache @@ -89,8 +92,6 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Sys cs.cindex = ci - cs.reorgCh = cs.reorgWorker(context.TODO()) - hcnf := func(rev, app []*types.TipSet) error { cs.pubLk.Lock() defer cs.pubLk.Unlock() @@ -122,7 +123,8 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Sys return nil } - cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf, hcmetric) + cs.reorgNotifeeCh = make(chan ReorgNotifee) + cs.reorgCh = cs.reorgWorker(context.TODO(), []ReorgNotifee{hcnf, hcmetric}) return cs } @@ -211,8 +213,8 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*api.HeadChange return out } -func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) { - cs.headChangeNotifs = append(cs.headChangeNotifs, f) +func (cs *ChainStore) SubscribeHeadChanges(f ReorgNotifee) { + cs.reorgNotifeeCh <- f } func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error { @@ -273,13 +275,19 @@ type reorg struct { new *types.TipSet } -func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg { +func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNotifee) chan<- reorg { out := make(chan reorg, 32) + notifees := make([]ReorgNotifee, len(initialNotifees)) + copy(notifees, initialNotifees) + go func() { defer log.Warn("reorgWorker quit") for { select { + case n := <-cs.reorgNotifeeCh: + notifees = append(notifees, n) + case r := <-out: revert, apply, err := cs.ReorgOps(r.old, r.new) if err != nil { @@ -293,7 +301,7 @@ func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg { apply[i], apply[opp] = apply[opp], apply[i] } - for _, hcf := range cs.headChangeNotifs { + for _, hcf := range notifees { if err := hcf(revert, apply); err != nil { log.Error("head change func errored (BAD): ", err) }