package modules

import (

	eventbus ""
	event ""
	pubsub ""

	discoveryimpl ""

	marketevents ""

func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
	h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)

	sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
	if err != nil {
		return xerrors.Errorf("failed to subscribe to event bus: %w", err)

	ctx := helpers.LifecycleCtx(mctx, lc)

	go func() {
		for evt := range sub.Out() {
			pic := evt.(event.EvtPeerIdentificationCompleted)
			go func() {
				if err := svc.SayHello(ctx, pic.Peer); err != nil {
					protos, _ := h.Peerstore().GetProtocols(pic.Peer)
					agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
					if protosContains(protos, hello.ProtocolID) {
						log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
					} else {
						log.Debugw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
	return nil

func protosContains(protos []string, search string) bool {
	for _, p := range protos {
		if p == search {
			return true
	return false

func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
	go pmgr.Run(helpers.LifecycleCtx(mctx, lc))

func RunChainExchange(h host.Host, svc exchange.Server) {
	h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream)     // old
	h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new

func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
	ctx := helpers.LifecycleCtx(mctx, lc)

	blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
	if err != nil {

	v := sub.NewBlockValidator(
		h.ID(), chain, stmgr,
		func(p peer.ID) {
			h.ConnManager().TagPeer(p, "badblock", -1000)

	if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {

	go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())

func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
	ctx := helpers.LifecycleCtx(mctx, lc)

	msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck
	if err != nil {

	v := sub.NewMessageValidator(h.ID(), mpool)

	if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {

	go sub.HandleIncomingMessages(ctx, mpool, msgsub)

func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
	local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
	if err != nil {
		return nil, err
		OnStart: func(ctx context.Context) error {
			return local.Start(ctx)
	return local, nil

func RetrievalResolver(l *discoveryimpl.Local) discovery.PeerResolver {
	return discoveryimpl.Multi(l)

type RandomBeaconParams struct {

	PubSub      *pubsub.PubSub `optional:"true"`
	Cs          *store.ChainStore
	DrandConfig dtypes.DrandSchedule

func BuiltinDrandConfig() dtypes.DrandSchedule {
	return build.DrandConfigSchedule()

func RandomSchedule(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
	gen, err := p.Cs.GetGenesis()
	if err != nil {
		return nil, err

	shd := beacon.Schedule{}
	for _, dc := range p.DrandConfig {
		bc, err := drand.NewDrandBeacon(gen.Timestamp, build.BlockDelaySecs, p.PubSub, dc.Config)
		if err != nil {
			return nil, xerrors.Errorf("creating drand beacon: %w", err)
		shd = append(shd, beacon.BeaconPoint{Start: dc.Start, Beacon: bc})

	return shd, nil

func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
	jrnl, err := journal.OpenFSJournal(lr, disabled)
	if err != nil {
		return nil, err

		OnStop: func(_ context.Context) error { return jrnl.Close() },

	return jrnl, err