diff --git a/chain/store/store.go b/chain/store/store.go index 00a78500e..cf934f257 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "encoding/json" + "errors" "io" "os" "strconv" @@ -59,6 +60,8 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation") var DefaultTipSetCacheSize = 8192 var DefaultMsgMetaCacheSize = 2048 +var ErrNotifeeDone = errors.New("notifee is done and should be removed") + func init() { if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" { tscs, err := strconv.Atoi(s) @@ -404,11 +407,36 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo apply[i], apply[opp] = apply[opp], apply[i] } - for _, hcf := range notifees { - if err := hcf(revert, apply); err != nil { + var toremove map[int]struct{} + for i, hcf := range notifees { + err := hcf(revert, apply) + + switch err { + case nil: + + case ErrNotifeeDone: + if toremove == nil { + toremove = make(map[int]struct{}) + } + toremove[i] = struct{}{} + + default: log.Error("head change func errored (BAD): ", err) } } + + if len(toremove) > 0 { + newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove)) + for i, hcf := range notifees { + _, remove := toremove[i] + if remove { + continue + } + newNotifees = append(newNotifees, hcf) + } + notifees = newNotifees + } + case <-ctx.Done(): return } diff --git a/node/modules/services.go b/node/modules/services.go index e0a7c2eda..2fbd3d299 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -2,6 +2,7 @@ package modules import ( "context" + "time" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -25,6 +26,7 @@ import ( "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/sub" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/peermgr" marketevents "github.com/filecoin-project/lotus/markets/loggers" @@ -82,14 +84,45 @@ func RunChainExchange(h host.Host, svc exchange.Server) { h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new } +func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) { + nearsync := uint64(epochs) * uint64(build.BlockDelaySecs) * uint64(time.Second) //nolint + + // early check, are we synced at start up? + ts := stmgr.ChainStore().GetHeaviestTipSet() + timestamp := ts.MinTimestamp() + now := uint64(build.Clock.Now().UnixNano()) + if timestamp > now-nearsync { + subscribe() + return + } + + // we are not synced, subscribe to head changes and wait for sync + stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { + if len(app) == 0 { + return nil + } + + latest := app[0].MinTimestamp() + for _, ts := range app[1:] { + timestamp := ts.MinTimestamp() + if timestamp > latest { + latest = timestamp + } + } + + now := uint64(build.Clock.Now().UnixNano()) + if latest > now-nearsync { + subscribe() + return store.ErrNotifeeDone + } + + return nil + }) +} + func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { ctx := helpers.LifecycleCtx(mctx, lc) - blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint - if err != nil { - panic(err) - } - v := sub.NewBlockValidator( h.ID(), chain, stmgr, func(p peer.ID) { @@ -101,24 +134,43 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P panic(err) } - go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) -} + log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn)) -func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) { - ctx := helpers.LifecycleCtx(mctx, lc) - - msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck + blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint if err != nil { panic(err) } + go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) +} + +func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) { + ctx := helpers.LifecycleCtx(mctx, lc) + v := sub.NewMessageValidator(h.ID(), mpool) if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil { panic(err) } - go sub.HandleIncomingMessages(ctx, mpool, msgsub) + subscribe := func() { + log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn)) + + msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint + if err != nil { + panic(err) + } + + go sub.HandleIncomingMessages(ctx, mpool, msgsub) + } + + if bootstrapper { + subscribe() + return + } + + // wait until we are synced within 10 epochs + waitForSync(stmgr, 10, subscribe) } func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {