251 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			251 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package modules
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"os"
 | |
| 	"strconv"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/ipfs/go-datastore"
 | |
| 	"github.com/ipfs/go-datastore/namespace"
 | |
| 	eventbus "github.com/libp2p/go-eventbus"
 | |
| 	event "github.com/libp2p/go-libp2p-core/event"
 | |
| 	"github.com/libp2p/go-libp2p-core/host"
 | |
| 	"github.com/libp2p/go-libp2p-core/peer"
 | |
| 	pubsub "github.com/libp2p/go-libp2p-pubsub"
 | |
| 	"go.uber.org/fx"
 | |
| 	"golang.org/x/xerrors"
 | |
| 
 | |
| 	"github.com/filecoin-project/go-fil-markets/discovery"
 | |
| 	discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"
 | |
| 
 | |
| 	"github.com/filecoin-project/lotus/build"
 | |
| 	"github.com/filecoin-project/lotus/chain"
 | |
| 	"github.com/filecoin-project/lotus/chain/beacon"
 | |
| 	"github.com/filecoin-project/lotus/chain/beacon/drand"
 | |
| 	"github.com/filecoin-project/lotus/chain/exchange"
 | |
| 	"github.com/filecoin-project/lotus/chain/messagepool"
 | |
| 	"github.com/filecoin-project/lotus/chain/stmgr"
 | |
| 	"github.com/filecoin-project/lotus/chain/store"
 | |
| 	"github.com/filecoin-project/lotus/chain/sub"
 | |
| 	"github.com/filecoin-project/lotus/chain/types"
 | |
| 	"github.com/filecoin-project/lotus/journal"
 | |
| 	"github.com/filecoin-project/lotus/lib/peermgr"
 | |
| 	marketevents "github.com/filecoin-project/lotus/markets/loggers"
 | |
| 	"github.com/filecoin-project/lotus/node/hello"
 | |
| 	"github.com/filecoin-project/lotus/node/modules/dtypes"
 | |
| 	"github.com/filecoin-project/lotus/node/modules/helpers"
 | |
| 	"github.com/filecoin-project/lotus/node/repo"
 | |
| )
 | |
| 
 | |
| var pubsubMsgsSyncEpochs = 10
 | |
| 
 | |
| func init() {
 | |
| 	if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" {
 | |
| 		val, err := strconv.Atoi(s)
 | |
| 		if err != nil {
 | |
| 			log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err)
 | |
| 			return
 | |
| 		}
 | |
| 		pubsubMsgsSyncEpochs = val
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
 | |
| 	h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
 | |
| 
 | |
| 	sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
 | |
| 	if err != nil {
 | |
| 		return xerrors.Errorf("failed to subscribe to event bus: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	ctx := helpers.LifecycleCtx(mctx, lc)
 | |
| 
 | |
| 	go func() {
 | |
| 		for evt := range sub.Out() {
 | |
| 			pic := evt.(event.EvtPeerIdentificationCompleted)
 | |
| 			go func() {
 | |
| 				if err := svc.SayHello(ctx, pic.Peer); err != nil {
 | |
| 					protos, _ := h.Peerstore().GetProtocols(pic.Peer)
 | |
| 					agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
 | |
| 					if protosContains(protos, hello.ProtocolID) {
 | |
| 						log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
 | |
| 					} else {
 | |
| 						log.Debugw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 			}()
 | |
| 		}
 | |
| 	}()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func protosContains(protos []string, search string) bool {
 | |
| 	for _, p := range protos {
 | |
| 		if p == search {
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
 | |
| 	go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
 | |
| }
 | |
| 
 | |
| func RunChainExchange(h host.Host, svc exchange.Server) {
 | |
| 	h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream)     // old
 | |
| 	h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
 | |
| }
 | |
| 
 | |
| func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
 | |
| 	nearsync := time.Duration(epochs*int(build.BlockDelaySecs)) * time.Second
 | |
| 
 | |
| 	// early check, are we synced at start up?
 | |
| 	ts := stmgr.ChainStore().GetHeaviestTipSet()
 | |
| 	timestamp := ts.MinTimestamp()
 | |
| 	timestampTime := time.Unix(int64(timestamp), 0)
 | |
| 	if build.Clock.Since(timestampTime) < nearsync {
 | |
| 		subscribe()
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// we are not synced, subscribe to head changes and wait for sync
 | |
| 	stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
 | |
| 		if len(app) == 0 {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		latest := app[0].MinTimestamp()
 | |
| 		for _, ts := range app[1:] {
 | |
| 			timestamp := ts.MinTimestamp()
 | |
| 			if timestamp > latest {
 | |
| 				latest = timestamp
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		latestTime := time.Unix(int64(latest), 0)
 | |
| 		if build.Clock.Since(latestTime) < nearsync {
 | |
| 			subscribe()
 | |
| 			return store.ErrNotifeeDone
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
 | |
| 	ctx := helpers.LifecycleCtx(mctx, lc)
 | |
| 
 | |
| 	v := sub.NewBlockValidator(
 | |
| 		h.ID(), chain, stmgr,
 | |
| 		func(p peer.ID) {
 | |
| 			ps.BlacklistPeer(p)
 | |
| 			h.ConnManager().TagPeer(p, "badblock", -1000)
 | |
| 		})
 | |
| 
 | |
| 	if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
 | |
| 		panic(err)
 | |
| 	}
 | |
| 
 | |
| 	log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
 | |
| 
 | |
| 	blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
 | |
| 	if err != nil {
 | |
| 		panic(err)
 | |
| 	}
 | |
| 
 | |
| 	go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
 | |
| }
 | |
| 
 | |
| func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
 | |
| 	ctx := helpers.LifecycleCtx(mctx, lc)
 | |
| 
 | |
| 	v := sub.NewMessageValidator(h.ID(), mpool)
 | |
| 
 | |
| 	if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
 | |
| 		panic(err)
 | |
| 	}
 | |
| 
 | |
| 	subscribe := func() {
 | |
| 		log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))
 | |
| 
 | |
| 		msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
 | |
| 		if err != nil {
 | |
| 			panic(err)
 | |
| 		}
 | |
| 
 | |
| 		go sub.HandleIncomingMessages(ctx, mpool, msgsub)
 | |
| 	}
 | |
| 
 | |
| 	if bootstrapper {
 | |
| 		subscribe()
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// wait until we are synced within 10 epochs -- env var can override
 | |
| 	waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe)
 | |
| }
 | |
| 
 | |
| func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
 | |
| 	local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	local.OnReady(marketevents.ReadyLogger("discovery"))
 | |
| 	lc.Append(fx.Hook{
 | |
| 		OnStart: func(ctx context.Context) error {
 | |
| 			return local.Start(ctx)
 | |
| 		},
 | |
| 	})
 | |
| 	return local, nil
 | |
| }
 | |
| 
 | |
| func RetrievalResolver(l *discoveryimpl.Local) discovery.PeerResolver {
 | |
| 	return discoveryimpl.Multi(l)
 | |
| }
 | |
| 
 | |
| type RandomBeaconParams struct {
 | |
| 	fx.In
 | |
| 
 | |
| 	PubSub      *pubsub.PubSub `optional:"true"`
 | |
| 	Cs          *store.ChainStore
 | |
| 	DrandConfig dtypes.DrandSchedule
 | |
| }
 | |
| 
 | |
| func BuiltinDrandConfig() dtypes.DrandSchedule {
 | |
| 	return build.DrandConfigSchedule()
 | |
| }
 | |
| 
 | |
| func RandomSchedule(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
 | |
| 	gen, err := p.Cs.GetGenesis()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	shd := beacon.Schedule{}
 | |
| 	for _, dc := range p.DrandConfig {
 | |
| 		bc, err := drand.NewDrandBeacon(gen.Timestamp, build.BlockDelaySecs, p.PubSub, dc.Config)
 | |
| 		if err != nil {
 | |
| 			return nil, xerrors.Errorf("creating drand beacon: %w", err)
 | |
| 		}
 | |
| 		shd = append(shd, beacon.BeaconPoint{Start: dc.Start, Beacon: bc})
 | |
| 	}
 | |
| 
 | |
| 	return shd, nil
 | |
| }
 | |
| 
 | |
| func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
 | |
| 	jrnl, err := journal.OpenFSJournal(lr, disabled)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	lc.Append(fx.Hook{
 | |
| 		OnStop: func(_ context.Context) error { return jrnl.Close() },
 | |
| 	})
 | |
| 
 | |
| 	return jrnl, err
 | |
| }
 |