From 32b779739433adda6bccb2a219b7aac3f3afb63a Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 8 Sep 2020 09:16:34 +0300 Subject: [PATCH 1/5] only subscribe to messages topic if we are synced within 10 epochs --- chain/store/store.go | 32 +++++++++++++++-- node/modules/services.go | 76 +++++++++++++++++++++++++++++++++------- 2 files changed, 94 insertions(+), 14 deletions(-) diff --git a/chain/store/store.go b/chain/store/store.go index 00a78500e..cf934f257 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "encoding/json" + "errors" "io" "os" "strconv" @@ -59,6 +60,8 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation") var DefaultTipSetCacheSize = 8192 var DefaultMsgMetaCacheSize = 2048 +var ErrNotifeeDone = errors.New("notifee is done and should be removed") + func init() { if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" { tscs, err := strconv.Atoi(s) @@ -404,11 +407,36 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo apply[i], apply[opp] = apply[opp], apply[i] } - for _, hcf := range notifees { - if err := hcf(revert, apply); err != nil { + var toremove map[int]struct{} + for i, hcf := range notifees { + err := hcf(revert, apply) + + switch err { + case nil: + + case ErrNotifeeDone: + if toremove == nil { + toremove = make(map[int]struct{}) + } + toremove[i] = struct{}{} + + default: log.Error("head change func errored (BAD): ", err) } } + + if len(toremove) > 0 { + newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove)) + for i, hcf := range notifees { + _, remove := toremove[i] + if remove { + continue + } + newNotifees = append(newNotifees, hcf) + } + notifees = newNotifees + } + case <-ctx.Done(): return } diff --git a/node/modules/services.go b/node/modules/services.go index e0a7c2eda..2fbd3d299 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -2,6 +2,7 @@ package modules import ( "context" + "time" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -25,6 +26,7 @@ import ( "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/sub" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/peermgr" marketevents "github.com/filecoin-project/lotus/markets/loggers" @@ -82,14 +84,45 @@ func RunChainExchange(h host.Host, svc exchange.Server) { h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new } +func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) { + nearsync := uint64(epochs) * uint64(build.BlockDelaySecs) * uint64(time.Second) //nolint + + // early check, are we synced at start up? + ts := stmgr.ChainStore().GetHeaviestTipSet() + timestamp := ts.MinTimestamp() + now := uint64(build.Clock.Now().UnixNano()) + if timestamp > now-nearsync { + subscribe() + return + } + + // we are not synced, subscribe to head changes and wait for sync + stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { + if len(app) == 0 { + return nil + } + + latest := app[0].MinTimestamp() + for _, ts := range app[1:] { + timestamp := ts.MinTimestamp() + if timestamp > latest { + latest = timestamp + } + } + + now := uint64(build.Clock.Now().UnixNano()) + if latest > now-nearsync { + subscribe() + return store.ErrNotifeeDone + } + + return nil + }) +} + func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { ctx := helpers.LifecycleCtx(mctx, lc) - blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint - if err != nil { - panic(err) - } - v := sub.NewBlockValidator( h.ID(), chain, stmgr, func(p peer.ID) { @@ -101,24 +134,43 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P panic(err) } - go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) -} + log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn)) -func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) { - ctx := helpers.LifecycleCtx(mctx, lc) - - msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck + blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint if err != nil { panic(err) } + go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) +} + +func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) { + ctx := helpers.LifecycleCtx(mctx, lc) + v := sub.NewMessageValidator(h.ID(), mpool) if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil { panic(err) } - go sub.HandleIncomingMessages(ctx, mpool, msgsub) + subscribe := func() { + log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn)) + + msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint + if err != nil { + panic(err) + } + + go sub.HandleIncomingMessages(ctx, mpool, msgsub) + } + + if bootstrapper { + subscribe() + return + } + + // wait until we are synced within 10 epochs + waitForSync(stmgr, 10, subscribe) } func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) { From 3306bdec293ac75866e1935b90d099841ed64b0b Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 8 Sep 2020 09:20:57 +0300 Subject: [PATCH 2/5] fix tests --- node/test/builder.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/node/test/builder.go b/node/test/builder.go index ea9a82220..9e5ffc40d 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -38,6 +38,7 @@ import ( lotusminer "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" testing2 "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/mockstorage" @@ -403,6 +404,9 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), + // so that we subscribe to pubsub topics immediately + node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)), + genesis, fullOpts[i].Opts(fulls), From c6cd699f0d3aafafda13873cb730ab79d163d7e3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 8 Sep 2020 10:21:20 +0300 Subject: [PATCH 3/5] fix nearsync check --- node/modules/services.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/modules/services.go b/node/modules/services.go index 2fbd3d299..62f8063e9 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -85,12 +85,12 @@ func RunChainExchange(h host.Host, svc exchange.Server) { } func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) { - nearsync := uint64(epochs) * uint64(build.BlockDelaySecs) * uint64(time.Second) //nolint + nearsync := uint64(epochs) * uint64(build.BlockDelaySecs) //nolint // early check, are we synced at start up? ts := stmgr.ChainStore().GetHeaviestTipSet() timestamp := ts.MinTimestamp() - now := uint64(build.Clock.Now().UnixNano()) + now := uint64(build.Clock.Now().Unix()) if timestamp > now-nearsync { subscribe() return @@ -110,7 +110,7 @@ func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) { } } - now := uint64(build.Clock.Now().UnixNano()) + now := uint64(build.Clock.Now().Unix()) if latest > now-nearsync { subscribe() return store.ErrNotifeeDone From 0e40a04975c9598c924d8a41c120d07d79a87814 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 8 Sep 2020 10:55:14 +0300 Subject: [PATCH 4/5] nicer check using Time objects --- node/modules/services.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/node/modules/services.go b/node/modules/services.go index 62f8063e9..e7a2fddf6 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -85,13 +85,13 @@ func RunChainExchange(h host.Host, svc exchange.Server) { } func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) { - nearsync := uint64(epochs) * uint64(build.BlockDelaySecs) //nolint + nearsync := time.Duration(epochs*int(build.BlockDelaySecs)) * time.Second // early check, are we synced at start up? ts := stmgr.ChainStore().GetHeaviestTipSet() timestamp := ts.MinTimestamp() - now := uint64(build.Clock.Now().Unix()) - if timestamp > now-nearsync { + timestampTime := time.Unix(int64(timestamp), 0) + if build.Clock.Since(timestampTime) < nearsync { subscribe() return } @@ -110,8 +110,8 @@ func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) { } } - now := uint64(build.Clock.Now().Unix()) - if latest > now-nearsync { + latestTime := time.Unix(int64(latest), 0) + if build.Clock.Since(latestTime) < nearsync { subscribe() return store.ErrNotifeeDone } From dab20efc993938fa3a5f83bb19aabd89010a4102 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 10 Sep 2020 16:22:02 +0300 Subject: [PATCH 5/5] add env var controlling msgs sync epochs --- node/modules/services.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/node/modules/services.go b/node/modules/services.go index e7a2fddf6..011b89163 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -2,6 +2,8 @@ package modules import ( "context" + "os" + "strconv" "time" "github.com/ipfs/go-datastore" @@ -36,6 +38,19 @@ import ( "github.com/filecoin-project/lotus/node/repo" ) +var pubsubMsgsSyncEpochs = 10 + +func init() { + if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" { + val, err := strconv.Atoi(s) + if err != nil { + log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err) + return + } + pubsubMsgsSyncEpochs = val + } +} + func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error { h.SetStreamHandler(hello.ProtocolID, svc.HandleStream) @@ -169,8 +184,8 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub return } - // wait until we are synced within 10 epochs - waitForSync(stmgr, 10, subscribe) + // wait until we are synced within 10 epochs -- env var can override + waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) } func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {