lotus/node/modules/services.go
Andrew Jackson (Ajax) a520ee85d6 master-to-sturdy
2023-09-21 10:37:02 -05:00

283 lines
8.2 KiB
Go

package modules
import (
"context"
"os"
"strconv"
"time"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/discovery"
discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/beacon/drand"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/peermgr"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
)
var pubsubMsgsSyncEpochs = 10
func init() {
if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" {
val, err := strconv.Atoi(s)
if err != nil {
log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err)
return
}
pubsubMsgsSyncEpochs = val
}
}
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
if err != nil {
return xerrors.Errorf("failed to subscribe to event bus: %w", err)
}
ctx := helpers.LifecycleCtx(mctx, lc)
go func() {
// We want to get information on connected peers, we don't want to trigger new connections.
ctx := network.WithNoDial(ctx, "filecoin hello")
for evt := range sub.Out() {
pic := evt.(event.EvtPeerIdentificationCompleted)
// We just finished identifying the peer, that means we should know what
// protocols it speaks. Check if it speeks the Filecoin hello protocol
// before continuing.
if p, _ := h.Peerstore().FirstSupportedProtocol(pic.Peer, hello.ProtocolID); p != hello.ProtocolID {
continue
}
go func() {
if err := svc.SayHello(ctx, pic.Peer); err != nil {
protos, _ := h.Peerstore().GetProtocols(pic.Peer)
agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
}
}()
}
}()
return nil
}
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
}
func RunChainExchange(h host.Host, svc exchange.Server) {
h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
}
func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
nearsync := time.Duration(epochs*int(build.BlockDelaySecs)) * time.Second
// early check, are we synced at start up?
ts := stmgr.ChainStore().GetHeaviestTipSet()
timestamp := ts.MinTimestamp()
timestampTime := time.Unix(int64(timestamp), 0)
if build.Clock.Since(timestampTime) < nearsync {
subscribe()
return
}
// we are not synced, subscribe to head changes and wait for sync
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
if len(app) == 0 {
return nil
}
latest := app[0].MinTimestamp()
for _, ts := range app[1:] {
timestamp := ts.MinTimestamp()
if timestamp > latest {
latest = timestamp
}
}
latestTime := time.Unix(int64(latest), 0)
if build.Clock.Since(latestTime) < nearsync {
subscribe()
return store.ErrNotifeeDone
}
return nil
})
}
func HandleIncomingBlocks(mctx helpers.MetricsCtx,
lc fx.Lifecycle,
ps *pubsub.PubSub,
s *chain.Syncer,
bserv dtypes.ChainBlockService,
chain *store.ChainStore,
cns consensus.Consensus,
h host.Host,
nn dtypes.NetworkName) {
ctx := helpers.LifecycleCtx(mctx, lc)
v := sub.NewBlockValidator(
h.ID(), chain, cns,
func(p peer.ID) {
ps.BlacklistPeer(p)
h.ConnManager().TagPeer(p, "badblock", -1000)
})
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
panic(err)
}
log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
if err != nil {
panic(err)
}
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
ctx := helpers.LifecycleCtx(mctx, lc)
v := sub.NewMessageValidator(h.ID(), mpool)
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
panic(err)
}
subscribe := func() {
log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
if err != nil {
panic(err)
}
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}
if bootstrapper {
subscribe()
return
}
// wait until we are synced within 10 epochs -- env var can override
waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe)
}
func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host, chainModule full.ChainModuleAPI, stateModule full.StateModuleAPI) error {
topicName := build.IndexerIngestTopic(nn)
v := sub.NewIndexerMessageValidator(h.ID(), chainModule, stateModule)
if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil {
return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err)
}
topicHandle, err := ps.Join(topicName)
if err != nil {
return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err)
}
cancelFunc, err := topicHandle.Relay()
if err != nil {
return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err)
}
// Cancel message relay on shutdown.
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
cancelFunc()
return nil
},
})
log.Infof("relaying messages for pubsub topic %s", topicName)
return nil
}
func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
if err != nil {
return nil, err
}
local.OnReady(marketevents.ReadyLogger("discovery"))
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return local.Start(ctx)
},
})
return local, nil
}
func RetrievalResolver(l *discoveryimpl.Local) discovery.PeerResolver {
return discoveryimpl.Multi(l)
}
type RandomBeaconParams struct {
fx.In
PubSub *pubsub.PubSub `optional:"true"`
Cs *store.ChainStore
DrandConfig dtypes.DrandSchedule
}
func BuiltinDrandConfig() dtypes.DrandSchedule {
return build.DrandConfigSchedule()
}
func RandomSchedule(lc fx.Lifecycle, mctx helpers.MetricsCtx, p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
gen, err := p.Cs.GetGenesis(helpers.LifecycleCtx(mctx, lc))
if err != nil {
return nil, err
}
shd, err := drand.BeaconScheduleFromDrandSchedule(p.DrandConfig, gen.Timestamp, p.PubSub)
if err != nil {
return nil, xerrors.Errorf("failed to create beacon schedule: %w", err)
}
return shd, nil
}
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
jrnl, err := fsjournal.OpenFSJournal(lr, disabled)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error { return jrnl.Close() },
})
return jrnl, err
}