only subscribed to pubsub topics once we are synced
This commit is contained in:
parent
0ad0d4ea11
commit
666dc65b8e
@ -1,6 +1,8 @@
|
|||||||
package modules
|
package modules
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
eventbus "github.com/libp2p/go-eventbus"
|
eventbus "github.com/libp2p/go-eventbus"
|
||||||
@ -22,6 +24,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/sub"
|
"github.com/filecoin-project/lotus/chain/sub"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||||
"github.com/filecoin-project/lotus/node/hello"
|
"github.com/filecoin-project/lotus/node/hello"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
@ -73,14 +76,50 @@ func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) {
|
|||||||
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
|
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForSync(stmgr *stmgr.StateManager, blocks int, subscribe func()) {
|
||||||
|
nearsync := uint64(blocks) * uint64(build.BlockDelaySecs) * uint64(time.Second)
|
||||||
|
|
||||||
|
// early check, are we synced at start up?
|
||||||
|
ts := stmgr.ChainStore().GetHeaviestTipSet()
|
||||||
|
timestamp := ts.MinTimestamp()
|
||||||
|
now := uint64(build.Clock.Now().UnixNano())
|
||||||
|
if timestamp > now-nearsync {
|
||||||
|
subscribe()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// we are not synced, subscribe to head changes and wait for sync
|
||||||
|
subscribed := false
|
||||||
|
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
||||||
|
if subscribed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(app) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
latest := app[0].MinTimestamp()
|
||||||
|
for _, ts := range app[1:] {
|
||||||
|
timestamp := ts.MinTimestamp()
|
||||||
|
if timestamp > latest {
|
||||||
|
latest = timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
now := uint64(build.Clock.Now().UnixNano())
|
||||||
|
if latest > now-nearsync {
|
||||||
|
subscribe()
|
||||||
|
subscribed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
|
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
v := sub.NewBlockValidator(
|
v := sub.NewBlockValidator(
|
||||||
h.ID(), chain, stmgr,
|
h.ID(), chain, stmgr,
|
||||||
func(p peer.ID) {
|
func(p peer.ID) {
|
||||||
@ -92,24 +131,39 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
// wait until we are synced within 10 blocks
|
||||||
|
waitForSync(stmgr, 10, func() {
|
||||||
|
log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
|
||||||
|
|
||||||
|
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
v := sub.NewMessageValidator(h.ID(), mpool)
|
v := sub.NewMessageValidator(h.ID(), mpool)
|
||||||
|
|
||||||
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
// wait until we are synced within 1 block
|
||||||
|
waitForSync(stmgr, 1, func() {
|
||||||
|
log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))
|
||||||
|
|
||||||
|
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
|
func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
|
||||||
|
Loading…
Reference in New Issue
Block a user