f7c1ceabe2
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
153 lines
4.7 KiB
Go
153 lines
4.7 KiB
Go
package modules
|
|
|
|
import (
|
|
"github.com/ipfs/go-datastore"
|
|
"github.com/ipfs/go-datastore/namespace"
|
|
eventbus "github.com/libp2p/go-eventbus"
|
|
event "github.com/libp2p/go-libp2p-core/event"
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"go.uber.org/fx"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain"
|
|
"github.com/filecoin-project/lotus/chain/beacon"
|
|
"github.com/filecoin-project/lotus/chain/beacon/drand"
|
|
"github.com/filecoin-project/lotus/chain/exchange"
|
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/sub"
|
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
|
"github.com/filecoin-project/lotus/node/hello"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
|
)
|
|
|
|
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
|
|
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
|
|
|
|
sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to subscribe to event bus: %w", err)
|
|
}
|
|
|
|
go func() {
|
|
for evt := range sub.Out() {
|
|
pic := evt.(event.EvtPeerIdentificationCompleted)
|
|
go func() {
|
|
if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), pic.Peer); err != nil {
|
|
protos, _ := h.Peerstore().GetProtocols(pic.Peer)
|
|
agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
|
|
if protosContains(protos, hello.ProtocolID) {
|
|
log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
|
|
} else {
|
|
log.Debugw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
|
|
}
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func protosContains(protos []string, search string) bool {
|
|
for _, p := range protos {
|
|
if p == search {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
|
|
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
|
|
}
|
|
|
|
func RunChainExchange(h host.Host, svc exchange.Server) {
|
|
h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream) // old
|
|
h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
|
|
}
|
|
|
|
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
v := sub.NewBlockValidator(
|
|
h.ID(), chain, stmgr,
|
|
func(p peer.ID) {
|
|
ps.BlacklistPeer(p)
|
|
h.ConnManager().TagPeer(p, "badblock", -1000)
|
|
})
|
|
|
|
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
|
}
|
|
|
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
v := sub.NewMessageValidator(h.ID(), mpool)
|
|
|
|
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
|
}
|
|
|
|
func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
|
|
return discovery.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
|
|
}
|
|
|
|
func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver {
|
|
return discovery.Multi(l)
|
|
}
|
|
|
|
type RandomBeaconParams struct {
|
|
fx.In
|
|
|
|
PubSub *pubsub.PubSub `optional:"true"`
|
|
Cs *store.ChainStore
|
|
DrandConfig dtypes.DrandSchedule
|
|
}
|
|
|
|
func BuiltinDrandConfig() dtypes.DrandSchedule {
|
|
return build.DrandConfigSchedule()
|
|
}
|
|
|
|
func RandomSchedule(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
|
|
gen, err := p.Cs.GetGenesis()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
shd := beacon.Schedule{}
|
|
for _, dc := range p.DrandConfig {
|
|
bc, err := drand.NewDrandBeacon(gen.Timestamp, build.BlockDelaySecs, p.PubSub, dc.Config)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("creating drand beacon: %w", err)
|
|
}
|
|
shd = append(shd, beacon.BeaconPoint{Start: dc.Start, Beacon: bc})
|
|
}
|
|
|
|
return shd, nil
|
|
}
|