c463582528
LifecycleCtx can _only_ be called during startup as it appends an fx hook. Worse, this was causing us to append an fx hook on every single hello message, leaking memory (and probably causing other shutdown issues...).
184 lines
5.5 KiB
Go
184 lines
5.5 KiB
Go
package modules
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/ipfs/go-datastore"
|
|
"github.com/ipfs/go-datastore/namespace"
|
|
eventbus "github.com/libp2p/go-eventbus"
|
|
event "github.com/libp2p/go-libp2p-core/event"
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"go.uber.org/fx"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-fil-markets/discovery"
|
|
discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain"
|
|
"github.com/filecoin-project/lotus/chain/beacon"
|
|
"github.com/filecoin-project/lotus/chain/beacon/drand"
|
|
"github.com/filecoin-project/lotus/chain/exchange"
|
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/sub"
|
|
"github.com/filecoin-project/lotus/journal"
|
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
|
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
|
"github.com/filecoin-project/lotus/node/hello"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
)
|
|
|
|
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
|
|
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
|
|
|
|
sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to subscribe to event bus: %w", err)
|
|
}
|
|
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
go func() {
|
|
for evt := range sub.Out() {
|
|
pic := evt.(event.EvtPeerIdentificationCompleted)
|
|
go func() {
|
|
if err := svc.SayHello(ctx, pic.Peer); err != nil {
|
|
protos, _ := h.Peerstore().GetProtocols(pic.Peer)
|
|
agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
|
|
if protosContains(protos, hello.ProtocolID) {
|
|
log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
|
|
} else {
|
|
log.Debugw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
|
|
}
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func protosContains(protos []string, search string) bool {
|
|
for _, p := range protos {
|
|
if p == search {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
|
|
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
|
|
}
|
|
|
|
func RunChainExchange(h host.Host, svc exchange.Server) {
|
|
h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream) // old
|
|
h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
|
|
}
|
|
|
|
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
v := sub.NewBlockValidator(
|
|
h.ID(), chain, stmgr,
|
|
func(p peer.ID) {
|
|
ps.BlacklistPeer(p)
|
|
h.ConnManager().TagPeer(p, "badblock", -1000)
|
|
})
|
|
|
|
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
|
}
|
|
|
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
v := sub.NewMessageValidator(h.ID(), mpool)
|
|
|
|
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
|
}
|
|
|
|
func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
|
|
local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
local.OnReady(marketevents.ReadyLogger("discovery"))
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
return local.Start(ctx)
|
|
},
|
|
})
|
|
return local, nil
|
|
}
|
|
|
|
func RetrievalResolver(l *discoveryimpl.Local) discovery.PeerResolver {
|
|
return discoveryimpl.Multi(l)
|
|
}
|
|
|
|
type RandomBeaconParams struct {
|
|
fx.In
|
|
|
|
PubSub *pubsub.PubSub `optional:"true"`
|
|
Cs *store.ChainStore
|
|
DrandConfig dtypes.DrandSchedule
|
|
}
|
|
|
|
func BuiltinDrandConfig() dtypes.DrandSchedule {
|
|
return build.DrandConfigSchedule()
|
|
}
|
|
|
|
func RandomSchedule(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
|
|
gen, err := p.Cs.GetGenesis()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
shd := beacon.Schedule{}
|
|
for _, dc := range p.DrandConfig {
|
|
bc, err := drand.NewDrandBeacon(gen.Timestamp, build.BlockDelaySecs, p.PubSub, dc.Config)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("creating drand beacon: %w", err)
|
|
}
|
|
shd = append(shd, beacon.BeaconPoint{Start: dc.Start, Beacon: bc})
|
|
}
|
|
|
|
return shd, nil
|
|
}
|
|
|
|
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
|
|
jrnl, err := journal.OpenFSJournal(lr, disabled)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(_ context.Context) error { return jrnl.Close() },
|
|
})
|
|
|
|
return jrnl, err
|
|
}
|