lotus/node/modules/services.go
2020-08-26 16:38:23 +01:00

161 lines
4.9 KiB
Go

package modules
import (
"context"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
eventbus "github.com/libp2p/go-eventbus"
event "github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/beacon/drand"
"github.com/filecoin-project/lotus/chain/blocksync"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
)
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
if err != nil {
return xerrors.Errorf("failed to subscribe to event bus: %w", err)
}
go func() {
for evt := range sub.Out() {
pic := evt.(event.EvtPeerIdentificationCompleted)
go func() {
if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), pic.Peer); err != nil {
protos, _ := h.Peerstore().GetProtocols(pic.Peer)
agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
if protosContains(protos, hello.ProtocolID) {
log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
} else {
log.Debugw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
}
return
}
}()
}
}()
return nil
}
func protosContains(protos []string, search string) bool {
for _, p := range protos {
if p == search {
return true
}
}
return false
}
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
}
func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) {
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
}
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
ctx := helpers.LifecycleCtx(mctx, lc)
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
if err != nil {
panic(err)
}
v := sub.NewBlockValidator(
h.ID(), chain, stmgr,
func(p peer.ID) {
ps.BlacklistPeer(p)
h.ConnManager().TagPeer(p, "badblock", -1000)
})
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
panic(err)
}
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
ctx := helpers.LifecycleCtx(mctx, lc)
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck
if err != nil {
panic(err)
}
v := sub.NewMessageValidator(h.ID(), mpool)
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
panic(err)
}
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}
func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
return discovery.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
}
func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver {
return discovery.Multi(l)
}
type RandomBeaconParams struct {
fx.In
PubSub *pubsub.PubSub `optional:"true"`
Cs *store.ChainStore
DrandConfig dtypes.DrandConfig
}
func BuiltinDrandConfig() dtypes.DrandConfig {
return build.DrandConfig()
}
func RandomBeacon(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.RandomBeacon, error) {
gen, err := p.Cs.GetGenesis()
if err != nil {
return nil, err
}
//return beacon.NewMockBeacon(build.BlockDelaySecs * time.Second)
return drand.NewDrandBeacon(gen.Timestamp, build.BlockDelaySecs, p.PubSub, p.DrandConfig)
}
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
jrnl, err := journal.OpenFSJournal(lr, disabled)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error { return jrnl.Close() },
})
return jrnl, err
}