lotus/node/modules/services.go

56 lines
1.5 KiB
Go
Raw Normal View History

2019-07-08 14:07:09 +00:00
package modules
import (
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/sub"
"github.com/filecoin-project/go-lotus/node/hello"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
2019-07-08 14:07:09 +00:00
)
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) {
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
bundle := inet.NotifyBundle{
ConnectedF: func(_ inet.Network, c inet.Conn) {
go func() {
if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), c.RemotePeer()); err != nil {
log.Warnw("failed to say hello", "error", err)
return
}
}()
},
}
h.Network().Notify(&bundle)
}
func RunBlockSync(h host.Host, svc *chain.BlockSyncService) {
h.SetStreamHandler(chain.BlockSyncProtocolID, svc.HandleStream)
}
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, s *chain.Syncer) {
ctx := helpers.LifecycleCtx(mctx, lc)
blocksub, err := pubsub.Subscribe("/fil/blocks")
if err != nil {
panic(err)
}
go sub.HandleIncomingBlocks(ctx, blocksub, s)
}
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, mpool *chain.MessagePool) {
ctx := helpers.LifecycleCtx(mctx, lc)
msgsub, err := pubsub.Subscribe("/fil/messages")
if err != nil {
panic(err)
}
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}