Merge pull request #3646 from filecoin-project/feat/delayed-pubsub-subscribe
delayed pubsub subscribe for messages topic
This commit is contained in:
commit
1af1ceb1e0
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
@ -59,6 +60,8 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
|
||||
var DefaultTipSetCacheSize = 8192
|
||||
var DefaultMsgMetaCacheSize = 2048
|
||||
|
||||
var ErrNotifeeDone = errors.New("notifee is done and should be removed")
|
||||
|
||||
func init() {
|
||||
if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" {
|
||||
tscs, err := strconv.Atoi(s)
|
||||
@ -435,11 +438,36 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
|
||||
apply[i], apply[opp] = apply[opp], apply[i]
|
||||
}
|
||||
|
||||
for _, hcf := range notifees {
|
||||
if err := hcf(revert, apply); err != nil {
|
||||
var toremove map[int]struct{}
|
||||
for i, hcf := range notifees {
|
||||
err := hcf(revert, apply)
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
|
||||
case ErrNotifeeDone:
|
||||
if toremove == nil {
|
||||
toremove = make(map[int]struct{})
|
||||
}
|
||||
toremove[i] = struct{}{}
|
||||
|
||||
default:
|
||||
log.Error("head change func errored (BAD): ", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(toremove) > 0 {
|
||||
newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove))
|
||||
for i, hcf := range notifees {
|
||||
_, remove := toremove[i]
|
||||
if remove {
|
||||
continue
|
||||
}
|
||||
newNotifees = append(newNotifees, hcf)
|
||||
}
|
||||
notifees = newNotifees
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
@ -2,6 +2,9 @@ package modules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
@ -25,6 +28,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/sub"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
||||
@ -34,6 +38,19 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
var pubsubMsgsSyncEpochs = 10
|
||||
|
||||
func init() {
|
||||
if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" {
|
||||
val, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err)
|
||||
return
|
||||
}
|
||||
pubsubMsgsSyncEpochs = val
|
||||
}
|
||||
}
|
||||
|
||||
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
|
||||
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
|
||||
|
||||
@ -82,14 +99,45 @@ func RunChainExchange(h host.Host, svc exchange.Server) {
|
||||
h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
|
||||
}
|
||||
|
||||
func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
|
||||
nearsync := time.Duration(epochs*int(build.BlockDelaySecs)) * time.Second
|
||||
|
||||
// early check, are we synced at start up?
|
||||
ts := stmgr.ChainStore().GetHeaviestTipSet()
|
||||
timestamp := ts.MinTimestamp()
|
||||
timestampTime := time.Unix(int64(timestamp), 0)
|
||||
if build.Clock.Since(timestampTime) < nearsync {
|
||||
subscribe()
|
||||
return
|
||||
}
|
||||
|
||||
// we are not synced, subscribe to head changes and wait for sync
|
||||
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
||||
if len(app) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
latest := app[0].MinTimestamp()
|
||||
for _, ts := range app[1:] {
|
||||
timestamp := ts.MinTimestamp()
|
||||
if timestamp > latest {
|
||||
latest = timestamp
|
||||
}
|
||||
}
|
||||
|
||||
latestTime := time.Unix(int64(latest), 0)
|
||||
if build.Clock.Since(latestTime) < nearsync {
|
||||
subscribe()
|
||||
return store.ErrNotifeeDone
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
v := sub.NewBlockValidator(
|
||||
h.ID(), chain, stmgr,
|
||||
func(p peer.ID) {
|
||||
@ -101,24 +149,43 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
||||
}
|
||||
log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
|
||||
|
||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck
|
||||
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
||||
}
|
||||
|
||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
v := sub.NewMessageValidator(h.ID(), mpool)
|
||||
|
||||
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
||||
subscribe := func() {
|
||||
log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))
|
||||
|
||||
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
||||
}
|
||||
|
||||
if bootstrapper {
|
||||
subscribe()
|
||||
return
|
||||
}
|
||||
|
||||
// wait until we are synced within 10 epochs -- env var can override
|
||||
waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe)
|
||||
}
|
||||
|
||||
func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
|
||||
|
@ -40,6 +40,7 @@ import (
|
||||
lotusminer "github.com/filecoin-project/lotus/miner"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/mockstorage"
|
||||
@ -412,6 +413,9 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes
|
||||
|
||||
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
|
||||
|
||||
// so that we subscribe to pubsub topics immediately
|
||||
node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),
|
||||
|
||||
genesis,
|
||||
|
||||
fullOpts[i].Opts(fulls),
|
||||
|
Loading…
Reference in New Issue
Block a user