package modules import ( "context" "os" "strconv" "time" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/libp2p/go-eventbus" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/go-fil-markets/discovery" discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/beacon/drand" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal/fsjournal" "github.com/filecoin-project/lotus/lib/peermgr" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" ) var pubsubMsgsSyncEpochs = 10 func init() { if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" { val, err := strconv.Atoi(s) if err != nil { log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err) return } pubsubMsgsSyncEpochs = val } } func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error { h.SetStreamHandler(hello.ProtocolID, svc.HandleStream) sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024)) if err != nil { return xerrors.Errorf("failed to subscribe to event bus: %w", err) } ctx := helpers.LifecycleCtx(mctx, lc) go func() { for evt := range sub.Out() { pic := evt.(event.EvtPeerIdentificationCompleted) go func() { if err := svc.SayHello(ctx, pic.Peer); err != nil { protos, _ := h.Peerstore().GetProtocols(pic.Peer) agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion") if protosContains(protos, hello.ProtocolID) { log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent) } else { log.Debugw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent) } return } }() } }() return nil } func protosContains(protos []string, search string) bool { for _, p := range protos { if p == search { return true } } return false } func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) { go pmgr.Run(helpers.LifecycleCtx(mctx, lc)) } func RunChainExchange(h host.Host, svc exchange.Server) { h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new } func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) { nearsync := time.Duration(epochs*int(build.BlockDelaySecs)) * time.Second // early check, are we synced at start up? ts := stmgr.ChainStore().GetHeaviestTipSet() timestamp := ts.MinTimestamp() timestampTime := time.Unix(int64(timestamp), 0) if build.Clock.Since(timestampTime) < nearsync { subscribe() return } // we are not synced, subscribe to head changes and wait for sync stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { if len(app) == 0 { return nil } latest := app[0].MinTimestamp() for _, ts := range app[1:] { timestamp := ts.MinTimestamp() if timestamp > latest { latest = timestamp } } latestTime := time.Unix(int64(latest), 0) if build.Clock.Since(latestTime) < nearsync { subscribe() return store.ErrNotifeeDone } return nil }) } func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, cns consensus.Consensus, h host.Host, nn dtypes.NetworkName) { ctx := helpers.LifecycleCtx(mctx, lc) v := sub.NewBlockValidator( h.ID(), chain, cns, func(p peer.ID) { ps.BlacklistPeer(p) h.ConnManager().TagPeer(p, "badblock", -1000) }) if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil { panic(err) } log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn)) blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint if err != nil { panic(err) } go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) } func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) { ctx := helpers.LifecycleCtx(mctx, lc) v := sub.NewMessageValidator(h.ID(), mpool) if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil { panic(err) } subscribe := func() { log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn)) msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint if err != nil { panic(err) } go sub.HandleIncomingMessages(ctx, mpool, msgsub) } if bootstrapper { subscribe() return } // wait until we are synced within 10 epochs -- env var can override waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) } func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host, chainModule full.ChainModuleAPI, stateModule full.StateModuleAPI) error { topicName := build.IndexerIngestTopic(nn) v := sub.NewIndexerMessageValidator(h.ID(), chainModule, stateModule) if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err) } topicHandle, err := ps.Join(topicName) if err != nil { return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err) } cancelFunc, err := topicHandle.Relay() if err != nil { return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err) } // Cancel message relay on shutdown. lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { cancelFunc() return nil }, }) log.Infof("relaying messages for pubsub topic %s", topicName) return nil } func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) { local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local"))) if err != nil { return nil, err } local.OnReady(marketevents.ReadyLogger("discovery")) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return local.Start(ctx) }, }) return local, nil } func RetrievalResolver(l *discoveryimpl.Local) discovery.PeerResolver { return discoveryimpl.Multi(l) } type RandomBeaconParams struct { fx.In PubSub *pubsub.PubSub `optional:"true"` Cs *store.ChainStore DrandConfig dtypes.DrandSchedule } func BuiltinDrandConfig() dtypes.DrandSchedule { return build.DrandConfigSchedule() } func RandomSchedule(lc fx.Lifecycle, mctx helpers.MetricsCtx, p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) { gen, err := p.Cs.GetGenesis(helpers.LifecycleCtx(mctx, lc)) if err != nil { return nil, err } shd := beacon.Schedule{} for _, dc := range p.DrandConfig { bc, err := drand.NewDrandBeacon(gen.Timestamp, build.BlockDelaySecs, p.PubSub, dc.Config) if err != nil { return nil, xerrors.Errorf("creating drand beacon: %w", err) } shd = append(shd, beacon.BeaconPoint{Start: dc.Start, Beacon: bc}) } return shd, nil } func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) { jrnl, err := fsjournal.OpenFSJournal(lr, disabled) if err != nil { return nil, err } lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { return jrnl.Close() }, }) return jrnl, err }