lotus/node/modules/services.go

130 lines
3.8 KiB
Go
Raw Normal View History

2019-07-08 14:07:09 +00:00
package modules
import (
2019-08-01 17:12:41 +00:00
"context"
eventbus "github.com/libp2p/go-eventbus"
event "github.com/libp2p/go-libp2p-core/event"
2019-07-08 14:07:09 +00:00
"github.com/libp2p/go-libp2p-core/host"
2020-02-17 05:51:18 +00:00
peer "github.com/libp2p/go-libp2p-peer"
2019-07-08 14:07:09 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
2020-01-10 18:01:48 +00:00
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/beacon"
2020-04-14 03:05:19 +00:00
"github.com/filecoin-project/lotus/chain/beacon/drand"
2019-11-09 23:00:22 +00:00
"github.com/filecoin-project/lotus/chain/blocksync"
2019-12-01 23:11:43 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
2020-04-14 03:05:19 +00:00
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub"
2020-02-22 11:36:22 +00:00
"github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
2019-07-08 14:07:09 +00:00
)
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
2019-07-08 14:07:09 +00:00
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
if err != nil {
return xerrors.Errorf("failed to subscribe to event bus: %w", err)
}
go func() {
for evt := range sub.Out() {
pic := evt.(event.EvtPeerIdentificationCompleted)
2019-07-08 14:07:09 +00:00
go func() {
if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), pic.Peer); err != nil {
2019-07-08 14:07:09 +00:00
log.Warnw("failed to say hello", "error", err)
return
}
}()
}
}()
return nil
2019-07-08 14:07:09 +00:00
}
2019-10-17 08:57:56 +00:00
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
}
2019-11-09 23:00:22 +00:00
func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) {
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
2019-07-08 14:07:09 +00:00
}
2020-03-31 23:13:37 +00:00
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, h host.Host, nn dtypes.NetworkName) {
2019-07-08 14:07:09 +00:00
ctx := helpers.LifecycleCtx(mctx, lc)
2020-03-31 23:13:37 +00:00
blocksub, err := ps.Subscribe(build.BlocksTopic(nn))
2019-07-08 14:07:09 +00:00
if err != nil {
panic(err)
}
2020-02-18 17:20:17 +00:00
v := sub.NewBlockValidator(func(p peer.ID) {
ps.BlacklistPeer(p)
h.ConnManager().TagPeer(p, "badblock", -1000)
})
2020-02-17 05:51:18 +00:00
2020-03-31 23:13:37 +00:00
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
2020-02-17 05:51:18 +00:00
panic(err)
}
go sub.HandleIncomingBlocks(ctx, blocksub, s, h.ConnManager())
2019-07-08 14:07:09 +00:00
}
2020-03-31 23:13:37 +00:00
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, nn dtypes.NetworkName) {
2019-07-08 14:07:09 +00:00
ctx := helpers.LifecycleCtx(mctx, lc)
2020-03-31 23:13:37 +00:00
msgsub, err := ps.Subscribe(build.MessagesTopic(nn))
2019-07-08 14:07:09 +00:00
if err != nil {
panic(err)
}
2020-02-28 01:39:07 +00:00
v := sub.NewMessageValidator(mpool)
2020-02-17 05:51:18 +00:00
2020-03-31 23:13:37 +00:00
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
2020-02-17 05:51:18 +00:00
panic(err)
}
2019-07-08 14:07:09 +00:00
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}
2019-08-01 17:12:41 +00:00
func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c storagemarket.StorageClient) {
2019-09-10 12:35:43 +00:00
ctx := helpers.LifecycleCtx(mctx, lc)
2019-08-01 17:12:41 +00:00
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
2019-09-10 12:35:43 +00:00
c.Run(ctx)
2019-08-01 17:12:41 +00:00
return nil
},
OnStop: func(context.Context) error {
c.Stop()
return nil
},
})
}
2019-08-14 20:27:10 +00:00
func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
return discovery.NewLocal(ds)
}
func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver {
2019-08-26 13:45:36 +00:00
return discovery.Multi(l)
}
2020-04-14 03:05:19 +00:00
func RandomBeacon(cs *store.ChainStore, _ dtypes.AfterGenesisSet) (beacon.RandomBeacon, error) {
gen, err := cs.GetGenesis()
if err != nil {
return nil, err
}
//return beacon.NewMockBeacon(build.BlockDelay * time.Second)
return drand.NewDrandBeacon(gen.Timestamp, build.BlockDelay)
}