subscribe early in bootstrappers and get rid of ugly test flag
This commit is contained in:
parent
e33798ff90
commit
81e674ad36
@ -31,9 +31,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
)
|
)
|
||||||
|
|
||||||
// for tests
|
|
||||||
var PubsubSubscribeImmediately = false
|
|
||||||
|
|
||||||
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
|
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
|
||||||
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
|
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
|
||||||
|
|
||||||
@ -115,7 +112,7 @@ func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
|
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
v := sub.NewBlockValidator(
|
v := sub.NewBlockValidator(
|
||||||
@ -140,8 +137,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
||||||
}
|
}
|
||||||
|
|
||||||
// for tests
|
if bootstrapper {
|
||||||
if PubsubSubscribeImmediately {
|
|
||||||
subscribe()
|
subscribe()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -150,7 +146,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
waitForSync(stmgr, 10, subscribe)
|
waitForSync(stmgr, 10, subscribe)
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
v := sub.NewMessageValidator(h.ID(), mpool)
|
v := sub.NewMessageValidator(h.ID(), mpool)
|
||||||
@ -170,8 +166,7 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub
|
|||||||
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
||||||
}
|
}
|
||||||
|
|
||||||
// for tests
|
if bootstrapper {
|
||||||
if PubsubSubscribeImmediately {
|
|
||||||
subscribe()
|
subscribe()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user