remove notifee after subscribing

This commit is contained in:
vyzo 2020-09-07 13:17:48 +03:00
parent 05740e0f64
commit 4f0ab4f226
2 changed files with 29 additions and 9 deletions

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"errors"
"io" "io"
"os" "os"
"strconv" "strconv"
@ -51,6 +52,8 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
var DefaultTipSetCacheSize = 8192 var DefaultTipSetCacheSize = 8192
var DefaultMsgMetaCacheSize = 2048 var DefaultMsgMetaCacheSize = 2048
var ErrNotifieeDone = errors.New("notifee is done and should be removed")
func init() { func init() {
if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" { if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" {
tscs, err := strconv.Atoi(s) tscs, err := strconv.Atoi(s)
@ -358,11 +361,33 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
apply[i], apply[opp] = apply[opp], apply[i] apply[i], apply[opp] = apply[opp], apply[i]
} }
for _, hcf := range notifees { var toremove map[int]struct{}
if err := hcf(revert, apply); err != nil { for i, hcf := range notifees {
log.Error("head change func errored (BAD): ", err) err := hcf(revert, apply)
if err != nil {
if err == ErrNotifieeDone {
if toremove == nil {
toremove = make(map[int]struct{})
}
toremove[i] = struct{}{}
} else {
log.Error("head change func errored (BAD): ", err)
}
} }
} }
if len(toremove) > 0 {
newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove))
for i, hcf := range notifees {
_, remove := toremove[i]
if remove {
continue
}
newNotifees = append(newNotifees, hcf)
}
notifees = newNotifees
}
case <-ctx.Done(): case <-ctx.Done():
return return
} }

View File

@ -89,12 +89,7 @@ func waitForSync(stmgr *stmgr.StateManager, blocks int, subscribe func()) {
} }
// we are not synced, subscribe to head changes and wait for sync // we are not synced, subscribe to head changes and wait for sync
subscribed := false
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
if subscribed {
return nil
}
if len(app) == 0 { if len(app) == 0 {
return nil return nil
} }
@ -110,7 +105,7 @@ func waitForSync(stmgr *stmgr.StateManager, blocks int, subscribe func()) {
now := uint64(build.Clock.Now().UnixNano()) now := uint64(build.Clock.Now().UnixNano())
if latest > now-nearsync { if latest > now-nearsync {
subscribe() subscribe()
subscribed = true return store.ErrNotifieeDone
} }
return nil return nil