2019-07-08 14:07:09 +00:00
|
|
|
package modules
|
|
|
|
|
|
|
|
import (
|
2020-08-11 12:48:32 +00:00
|
|
|
"context"
|
2020-09-10 13:22:02 +00:00
|
|
|
"os"
|
|
|
|
"strconv"
|
2020-09-08 06:16:34 +00:00
|
|
|
"time"
|
2020-08-11 12:48:32 +00:00
|
|
|
|
2020-05-20 22:46:44 +00:00
|
|
|
"github.com/ipfs/go-datastore"
|
|
|
|
"github.com/ipfs/go-datastore/namespace"
|
2019-07-08 14:07:09 +00:00
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
2022-08-25 18:20:41 +00:00
|
|
|
"github.com/libp2p/go-libp2p/core/event"
|
|
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
2023-03-03 02:37:13 +00:00
|
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
2022-08-25 18:23:25 +00:00
|
|
|
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
2019-07-08 14:07:09 +00:00
|
|
|
"go.uber.org/fx"
|
2020-03-19 04:13:04 +00:00
|
|
|
"golang.org/x/xerrors"
|
2019-07-08 15:14:36 +00:00
|
|
|
|
2020-09-29 11:53:30 +00:00
|
|
|
"github.com/filecoin-project/go-fil-markets/discovery"
|
|
|
|
discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"
|
|
|
|
|
2020-03-03 23:44:08 +00:00
|
|
|
"github.com/filecoin-project/lotus/build"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain"
|
2020-03-25 23:16:17 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/beacon"
|
2020-04-14 03:05:19 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/beacon/drand"
|
2022-06-14 15:00:51 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/consensus"
|
2020-09-07 18:31:43 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/exchange"
|
2019-12-01 23:11:43 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
2020-05-12 18:05:29 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
2020-04-14 03:05:19 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/store"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/sub"
|
2020-09-08 06:16:34 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
2020-08-11 12:48:32 +00:00
|
|
|
"github.com/filecoin-project/lotus/journal"
|
2021-08-17 12:55:35 +00:00
|
|
|
"github.com/filecoin-project/lotus/journal/fsjournal"
|
2020-02-22 11:36:22 +00:00
|
|
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
2020-09-29 11:53:30 +00:00
|
|
|
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/hello"
|
2022-02-08 12:55:59 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/impl/full"
|
2019-12-17 03:17:46 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
2020-08-11 12:48:32 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/repo"
|
2019-07-08 14:07:09 +00:00
|
|
|
)
|
|
|
|
|
2020-09-10 13:22:02 +00:00
|
|
|
var pubsubMsgsSyncEpochs = 10
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" {
|
|
|
|
val, err := strconv.Atoi(s)
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
pubsubMsgsSyncEpochs = val
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-19 04:13:04 +00:00
|
|
|
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
|
2019-07-08 14:07:09 +00:00
|
|
|
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
|
|
|
|
|
2020-03-19 04:13:04 +00:00
|
|
|
sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("failed to subscribe to event bus: %w", err)
|
|
|
|
}
|
|
|
|
|
2020-10-10 15:33:06 +00:00
|
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
|
2020-03-19 04:13:04 +00:00
|
|
|
go func() {
|
|
|
|
for evt := range sub.Out() {
|
|
|
|
pic := evt.(event.EvtPeerIdentificationCompleted)
|
2019-07-08 14:07:09 +00:00
|
|
|
go func() {
|
2020-10-10 15:33:06 +00:00
|
|
|
if err := svc.SayHello(ctx, pic.Peer); err != nil {
|
2020-07-23 23:58:21 +00:00
|
|
|
protos, _ := h.Peerstore().GetProtocols(pic.Peer)
|
|
|
|
agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
|
2020-07-29 21:28:25 +00:00
|
|
|
if protosContains(protos, hello.ProtocolID) {
|
|
|
|
log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
|
|
|
|
} else {
|
|
|
|
log.Debugw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
|
|
|
|
}
|
2019-07-08 14:07:09 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}()
|
2020-03-19 04:13:04 +00:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
return nil
|
2019-07-08 14:07:09 +00:00
|
|
|
}
|
|
|
|
|
2023-03-03 02:37:13 +00:00
|
|
|
func protosContains(protos []protocol.ID, search protocol.ID) bool {
|
2020-07-29 21:28:25 +00:00
|
|
|
for _, p := range protos {
|
|
|
|
if p == search {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2019-10-17 08:57:56 +00:00
|
|
|
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
|
|
|
|
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
|
|
|
|
}
|
|
|
|
|
2020-09-07 18:45:34 +00:00
|
|
|
func RunChainExchange(h host.Host, svc exchange.Server) {
|
|
|
|
h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
|
2019-07-08 14:07:09 +00:00
|
|
|
}
|
|
|
|
|
2020-09-08 06:16:34 +00:00
|
|
|
func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
|
2020-09-08 07:55:14 +00:00
|
|
|
nearsync := time.Duration(epochs*int(build.BlockDelaySecs)) * time.Second
|
2019-07-08 14:07:09 +00:00
|
|
|
|
2020-09-08 06:16:34 +00:00
|
|
|
// early check, are we synced at start up?
|
|
|
|
ts := stmgr.ChainStore().GetHeaviestTipSet()
|
|
|
|
timestamp := ts.MinTimestamp()
|
2020-09-08 07:55:14 +00:00
|
|
|
timestampTime := time.Unix(int64(timestamp), 0)
|
|
|
|
if build.Clock.Since(timestampTime) < nearsync {
|
2020-09-08 06:16:34 +00:00
|
|
|
subscribe()
|
|
|
|
return
|
2019-07-08 14:07:09 +00:00
|
|
|
}
|
|
|
|
|
2020-09-08 06:16:34 +00:00
|
|
|
// we are not synced, subscribe to head changes and wait for sync
|
|
|
|
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
|
|
|
if len(app) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
latest := app[0].MinTimestamp()
|
|
|
|
for _, ts := range app[1:] {
|
|
|
|
timestamp := ts.MinTimestamp()
|
|
|
|
if timestamp > latest {
|
|
|
|
latest = timestamp
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-08 07:55:14 +00:00
|
|
|
latestTime := time.Unix(int64(latest), 0)
|
|
|
|
if build.Clock.Since(latestTime) < nearsync {
|
2020-09-08 06:16:34 +00:00
|
|
|
subscribe()
|
|
|
|
return store.ErrNotifeeDone
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-09-02 16:07:23 +00:00
|
|
|
func HandleIncomingBlocks(mctx helpers.MetricsCtx,
|
|
|
|
lc fx.Lifecycle,
|
|
|
|
ps *pubsub.PubSub,
|
|
|
|
s *chain.Syncer,
|
|
|
|
bserv dtypes.ChainBlockService,
|
|
|
|
chain *store.ChainStore,
|
|
|
|
cns consensus.Consensus,
|
|
|
|
h host.Host,
|
|
|
|
nn dtypes.NetworkName) {
|
2020-09-08 06:16:34 +00:00
|
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
|
2020-05-12 18:05:29 +00:00
|
|
|
v := sub.NewBlockValidator(
|
2021-09-02 16:07:23 +00:00
|
|
|
h.ID(), chain, cns,
|
2020-05-12 18:05:29 +00:00
|
|
|
func(p peer.ID) {
|
|
|
|
ps.BlacklistPeer(p)
|
|
|
|
h.ConnManager().TagPeer(p, "badblock", -1000)
|
|
|
|
})
|
2020-02-17 05:51:18 +00:00
|
|
|
|
2020-03-31 23:13:37 +00:00
|
|
|
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
|
2020-02-17 05:51:18 +00:00
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
2020-09-08 06:16:34 +00:00
|
|
|
log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
|
2019-07-08 14:07:09 +00:00
|
|
|
|
2020-09-08 06:16:34 +00:00
|
|
|
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
2019-07-08 14:07:09 +00:00
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
2023-04-27 19:18:02 +00:00
|
|
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
2020-09-08 06:16:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
|
|
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
|
2020-08-16 17:46:19 +00:00
|
|
|
v := sub.NewMessageValidator(h.ID(), mpool)
|
2020-02-17 05:51:18 +00:00
|
|
|
|
2020-03-31 23:13:37 +00:00
|
|
|
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
2020-02-17 05:51:18 +00:00
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
2020-09-08 06:16:34 +00:00
|
|
|
subscribe := func() {
|
|
|
|
log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))
|
|
|
|
|
|
|
|
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
|
|
|
}
|
|
|
|
|
|
|
|
if bootstrapper {
|
|
|
|
subscribe()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-09-10 13:22:02 +00:00
|
|
|
// wait until we are synced within 10 epochs -- env var can override
|
|
|
|
waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe)
|
2019-07-08 14:07:09 +00:00
|
|
|
}
|
2019-08-01 17:12:41 +00:00
|
|
|
|
2022-02-10 16:41:18 +00:00
|
|
|
func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host, chainModule full.ChainModuleAPI, stateModule full.StateModuleAPI) error {
|
2022-02-04 08:15:01 +00:00
|
|
|
topicName := build.IndexerIngestTopic(nn)
|
2022-02-03 22:56:21 +00:00
|
|
|
|
2022-02-10 16:41:18 +00:00
|
|
|
v := sub.NewIndexerMessageValidator(h.ID(), chainModule, stateModule)
|
2022-02-03 22:56:21 +00:00
|
|
|
|
|
|
|
if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil {
|
2022-02-04 08:15:01 +00:00
|
|
|
return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err)
|
2022-02-03 22:56:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
topicHandle, err := ps.Join(topicName)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err)
|
|
|
|
}
|
|
|
|
cancelFunc, err := topicHandle.Relay()
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cancel message relay on shutdown.
|
|
|
|
lc.Append(fx.Hook{
|
|
|
|
OnStop: func(_ context.Context) error {
|
|
|
|
cancelFunc()
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
log.Infof("relaying messages for pubsub topic %s", topicName)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-09-29 11:53:30 +00:00
|
|
|
func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
|
|
|
|
local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
local.OnReady(marketevents.ReadyLogger("discovery"))
|
|
|
|
lc.Append(fx.Hook{
|
|
|
|
OnStart: func(ctx context.Context) error {
|
|
|
|
return local.Start(ctx)
|
|
|
|
},
|
|
|
|
})
|
|
|
|
return local, nil
|
2019-12-17 03:17:46 +00:00
|
|
|
}
|
|
|
|
|
2020-09-29 11:53:30 +00:00
|
|
|
func RetrievalResolver(l *discoveryimpl.Local) discovery.PeerResolver {
|
|
|
|
return discoveryimpl.Multi(l)
|
2019-08-26 13:45:36 +00:00
|
|
|
}
|
2020-03-25 23:16:17 +00:00
|
|
|
|
2020-05-29 13:46:05 +00:00
|
|
|
type RandomBeaconParams struct {
|
|
|
|
fx.In
|
|
|
|
|
2020-06-23 19:56:03 +00:00
|
|
|
PubSub *pubsub.PubSub `optional:"true"`
|
2020-06-23 19:10:27 +00:00
|
|
|
Cs *store.ChainStore
|
2020-09-09 18:37:12 +00:00
|
|
|
DrandConfig dtypes.DrandSchedule
|
2020-06-23 19:56:03 +00:00
|
|
|
}
|
|
|
|
|
2020-09-09 18:37:12 +00:00
|
|
|
func BuiltinDrandConfig() dtypes.DrandSchedule {
|
|
|
|
return build.DrandConfigSchedule()
|
2020-05-29 13:46:05 +00:00
|
|
|
}
|
|
|
|
|
2021-12-17 09:42:09 +00:00
|
|
|
func RandomSchedule(lc fx.Lifecycle, mctx helpers.MetricsCtx, p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
|
|
|
|
gen, err := p.Cs.GetGenesis(helpers.LifecycleCtx(mctx, lc))
|
2020-04-14 03:05:19 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-07-15 15:26:18 +00:00
|
|
|
shd, err := drand.BeaconScheduleFromDrandSchedule(p.DrandConfig, gen.Timestamp, p.PubSub)
|
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("failed to create beacon schedule: %w", err)
|
2020-09-09 18:37:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return shd, nil
|
2020-03-25 23:16:17 +00:00
|
|
|
}
|
2020-08-11 12:48:32 +00:00
|
|
|
|
|
|
|
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
|
2021-08-17 12:55:35 +00:00
|
|
|
jrnl, err := fsjournal.OpenFSJournal(lr, disabled)
|
2020-08-11 12:48:32 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
lc.Append(fx.Hook{
|
|
|
|
OnStop: func(_ context.Context) error { return jrnl.Close() },
|
|
|
|
})
|
|
|
|
|
|
|
|
return jrnl, err
|
|
|
|
}
|