From 4f0ab4f2266759d555d05e68dfd515458ae88fb8 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 7 Sep 2020 13:17:48 +0300 Subject: [PATCH] remove notifee after subscribing --- chain/store/store.go | 31 ++++++++++++++++++++++++++++--- node/modules/services.go | 7 +------ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/chain/store/store.go b/chain/store/store.go index 2ae7fab2c..4bfd20040 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "encoding/json" + "errors" "io" "os" "strconv" @@ -51,6 +52,8 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation") var DefaultTipSetCacheSize = 8192 var DefaultMsgMetaCacheSize = 2048 +var ErrNotifieeDone = errors.New("notifee is done and should be removed") + func init() { if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" { tscs, err := strconv.Atoi(s) @@ -358,11 +361,33 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo apply[i], apply[opp] = apply[opp], apply[i] } - for _, hcf := range notifees { - if err := hcf(revert, apply); err != nil { - log.Error("head change func errored (BAD): ", err) + var toremove map[int]struct{} + for i, hcf := range notifees { + err := hcf(revert, apply) + if err != nil { + if err == ErrNotifieeDone { + if toremove == nil { + toremove = make(map[int]struct{}) + } + toremove[i] = struct{}{} + } else { + log.Error("head change func errored (BAD): ", err) + } } } + + if len(toremove) > 0 { + newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove)) + for i, hcf := range notifees { + _, remove := toremove[i] + if remove { + continue + } + newNotifees = append(newNotifees, hcf) + } + notifees = newNotifees + } + case <-ctx.Done(): return } diff --git a/node/modules/services.go b/node/modules/services.go index a98613932..65e488d36 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -89,12 +89,7 @@ func waitForSync(stmgr *stmgr.StateManager, blocks int, subscribe func()) { } // we are not synced, subscribe to head changes and wait for sync - subscribed := false stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { - if subscribed { - return nil - } - if len(app) == 0 { return nil } @@ -110,7 +105,7 @@ func waitForSync(stmgr *stmgr.StateManager, blocks int, subscribe func()) { now := uint64(build.Clock.Now().UnixNano()) if latest > now-nearsync { subscribe() - subscribed = true + return store.ErrNotifieeDone } return nil