From c77f5f62528228c5e0ec9998a41b3bd006a7a55f Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Tue, 8 Sep 2020 00:32:26 -0400 Subject: [PATCH] Revert "only subscribe to pubsub topics once we are synced" --- chain/store/store.go | 31 ++------------ chain/sync_test.go | 3 -- node/modules/services.go | 91 +++++++--------------------------------- node/test/builder.go | 3 -- 4 files changed, 17 insertions(+), 111 deletions(-) diff --git a/chain/store/store.go b/chain/store/store.go index 398828b1a..e0a997686 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -5,7 +5,6 @@ import ( "context" "encoding/binary" "encoding/json" - "errors" "io" "os" "strconv" @@ -52,8 +51,6 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation") var DefaultTipSetCacheSize = 8192 var DefaultMsgMetaCacheSize = 2048 -var ErrNotifieeDone = errors.New("notifee is done and should be removed") - func init() { if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" { tscs, err := strconv.Atoi(s) @@ -361,33 +358,11 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo apply[i], apply[opp] = apply[opp], apply[i] } - var toremove map[int]struct{} - for i, hcf := range notifees { - err := hcf(revert, apply) - if err != nil { - if err == ErrNotifieeDone { - if toremove == nil { - toremove = make(map[int]struct{}) - } - toremove[i] = struct{}{} - } else { - log.Error("head change func errored (BAD): ", err) - } + for _, hcf := range notifees { + if err := hcf(revert, apply); err != nil { + log.Error("head change func errored (BAD): ", err) } } - - if len(toremove) > 0 { - newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove)) - for i, hcf := range notifees { - _, remove := toremove[i] - if remove { - continue - } - newNotifees = append(newNotifees, hcf) - } - notifees = newNotifees - } - case <-ctx.Done(): return } diff --git a/chain/sync_test.go b/chain/sync_test.go index 16e0d2ffb..f91929a02 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -34,7 +34,6 @@ import ( "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/modules" - "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" ) @@ -234,7 +233,6 @@ func (tu *syncTestUtil) addSourceNode(gen int) { node.Repo(sourceRepo), node.MockHost(tu.mn), node.Test(), - node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)), node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)), ) @@ -267,7 +265,6 @@ func (tu *syncTestUtil) addClientNode() int { node.Repo(repo.NewMemory(nil)), node.MockHost(tu.mn), node.Test(), - node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)), node.Override(new(modules.Genesis), modules.LoadGenesis(tu.genesis)), ) diff --git a/node/modules/services.go b/node/modules/services.go index 22d99fa54..fc7486abe 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -1,8 +1,6 @@ package modules import ( - "time" - "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" eventbus "github.com/libp2p/go-eventbus" @@ -24,7 +22,6 @@ import ( "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/sub" - "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -76,45 +73,14 @@ func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) { h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream) } -func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) { - nearsync := uint64(epochs) * uint64(build.BlockDelaySecs) * uint64(time.Second) //nolint - - // early check, are we synced at start up? - ts := stmgr.ChainStore().GetHeaviestTipSet() - timestamp := ts.MinTimestamp() - now := uint64(build.Clock.Now().UnixNano()) - if timestamp > now-nearsync { - subscribe() - return - } - - // we are not synced, subscribe to head changes and wait for sync - stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { - if len(app) == 0 { - return nil - } - - latest := app[0].MinTimestamp() - for _, ts := range app[1:] { - timestamp := ts.MinTimestamp() - if timestamp > latest { - latest = timestamp - } - } - - now := uint64(build.Clock.Now().UnixNano()) - if latest > now-nearsync { - subscribe() - return store.ErrNotifieeDone - } - - return nil - }) -} - -func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) { +func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { ctx := helpers.LifecycleCtx(mctx, lc) + blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint + if err != nil { + panic(err) + } + v := sub.NewBlockValidator( h.ID(), chain, stmgr, func(p peer.ID) { @@ -126,53 +92,24 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P panic(err) } - subscribe := func() { - log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn)) - - blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint - if err != nil { - panic(err) - } - - go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) - } - - if bootstrapper { - subscribe() - return - } - - // wait until we are synced within 10 blocks - waitForSync(stmgr, 10, subscribe) + go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) } -func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) { +func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) { ctx := helpers.LifecycleCtx(mctx, lc) + msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck + if err != nil { + panic(err) + } + v := sub.NewMessageValidator(h.ID(), mpool) if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil { panic(err) } - subscribe := func() { - log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn)) - - msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint - if err != nil { - panic(err) - } - - go sub.HandleIncomingMessages(ctx, mpool, msgsub) - } - - if bootstrapper { - subscribe() - return - } - - // wait until we are synced within 1 block - waitForSync(stmgr, 1, subscribe) + go sub.HandleIncomingMessages(ctx, mpool, msgsub) } func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local { diff --git a/node/test/builder.go b/node/test/builder.go index 20a54efa4..de2071e7a 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -33,7 +33,6 @@ import ( miner2 "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules" - "github.com/filecoin-project/lotus/node/modules/dtypes" testing2 "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/mockstorage" @@ -371,8 +370,6 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner) ([]test node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), - node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)), - genesis, ) if err != nil {