package modules

import (
	"context"
	"os"
	"strconv"
	"time"

	"github.com/ipfs/go-datastore"
	"github.com/ipfs/go-datastore/namespace"
	pubsub "github.com/libp2p/go-libp2p-pubsub"
	"github.com/libp2p/go-libp2p/core/event"
	"github.com/libp2p/go-libp2p/core/host"
	"github.com/libp2p/go-libp2p/core/peer"
	"github.com/libp2p/go-libp2p/p2p/host/eventbus"
	"go.uber.org/fx"
	"golang.org/x/xerrors"

	"github.com/filecoin-project/go-fil-markets/discovery"
	discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"

	"github.com/filecoin-project/lotus/build"
	"github.com/filecoin-project/lotus/chain"
	"github.com/filecoin-project/lotus/chain/beacon"
	"github.com/filecoin-project/lotus/chain/beacon/drand"
	"github.com/filecoin-project/lotus/chain/consensus"
	"github.com/filecoin-project/lotus/chain/exchange"
	"github.com/filecoin-project/lotus/chain/messagepool"
	"github.com/filecoin-project/lotus/chain/stmgr"
	"github.com/filecoin-project/lotus/chain/store"
	"github.com/filecoin-project/lotus/chain/sub"
	"github.com/filecoin-project/lotus/chain/types"
	"github.com/filecoin-project/lotus/journal"
	"github.com/filecoin-project/lotus/journal/fsjournal"
	"github.com/filecoin-project/lotus/lib/peermgr"
	marketevents "github.com/filecoin-project/lotus/markets/loggers"
	"github.com/filecoin-project/lotus/node/hello"
	"github.com/filecoin-project/lotus/node/impl/full"
	"github.com/filecoin-project/lotus/node/modules/dtypes"
	"github.com/filecoin-project/lotus/node/modules/helpers"
	"github.com/filecoin-project/lotus/node/repo"
)

var pubsubMsgsSyncEpochs = 10

func init() {
	if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" {
		val, err := strconv.Atoi(s)
		if err != nil {
			log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err)
			return
		}
		pubsubMsgsSyncEpochs = val
	}
}

func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
	h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)

	sub, err := h.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
	if err != nil {
		return xerrors.Errorf("failed to subscribe to event bus: %w", err)
	}

	ctx := helpers.LifecycleCtx(mctx, lc)

	go func() {
		for evt := range sub.Out() {
			pic := evt.(event.EvtPeerIdentificationCompleted)
			go func() {
				if err := svc.SayHello(ctx, pic.Peer); err != nil {
					protos, _ := h.Peerstore().GetProtocols(pic.Peer)
					agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
					if protosContains(protos, hello.ProtocolID) {
						log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
					} else {
						log.Debugw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
					}
					return
				}
			}()
		}
	}()
	return nil
}

func protosContains(protos []string, search string) bool {
	for _, p := range protos {
		if p == search {
			return true
		}
	}
	return false
}

func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
	go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
}

func RunChainExchange(h host.Host, svc exchange.Server) {
	h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
}

func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
	nearsync := time.Duration(epochs*int(build.BlockDelaySecs)) * time.Second

	// early check, are we synced at start up?
	ts := stmgr.ChainStore().GetHeaviestTipSet()
	timestamp := ts.MinTimestamp()
	timestampTime := time.Unix(int64(timestamp), 0)
	if build.Clock.Since(timestampTime) < nearsync {
		subscribe()
		return
	}

	// we are not synced, subscribe to head changes and wait for sync
	stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
		if len(app) == 0 {
			return nil
		}

		latest := app[0].MinTimestamp()
		for _, ts := range app[1:] {
			timestamp := ts.MinTimestamp()
			if timestamp > latest {
				latest = timestamp
			}
		}

		latestTime := time.Unix(int64(latest), 0)
		if build.Clock.Since(latestTime) < nearsync {
			subscribe()
			return store.ErrNotifeeDone
		}

		return nil
	})
}

func HandleIncomingBlocks(mctx helpers.MetricsCtx,
	lc fx.Lifecycle,
	ps *pubsub.PubSub,
	s *chain.Syncer,
	bserv dtypes.ChainBlockService,
	chain *store.ChainStore,
	cns consensus.Consensus,
	h host.Host,
	nn dtypes.NetworkName) {
	ctx := helpers.LifecycleCtx(mctx, lc)

	v := sub.NewBlockValidator(
		h.ID(), chain, cns,
		func(p peer.ID) {
			ps.BlacklistPeer(p)
			h.ConnManager().TagPeer(p, "badblock", -1000)
		})

	if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
		panic(err)
	}

	log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))

	blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
	if err != nil {
		panic(err)
	}

	go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}

func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
	ctx := helpers.LifecycleCtx(mctx, lc)

	v := sub.NewMessageValidator(h.ID(), mpool)

	if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
		panic(err)
	}

	subscribe := func() {
		log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))

		msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
		if err != nil {
			panic(err)
		}

		go sub.HandleIncomingMessages(ctx, mpool, msgsub)
	}

	if bootstrapper {
		subscribe()
		return
	}

	// wait until we are synced within 10 epochs -- env var can override
	waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe)
}

func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host, chainModule full.ChainModuleAPI, stateModule full.StateModuleAPI) error {
	topicName := build.IndexerIngestTopic(nn)

	v := sub.NewIndexerMessageValidator(h.ID(), chainModule, stateModule)

	if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil {
		return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err)
	}

	topicHandle, err := ps.Join(topicName)
	if err != nil {
		return xerrors.Errorf("failed to join pubsub topic %s: %w", topicName, err)
	}
	cancelFunc, err := topicHandle.Relay()
	if err != nil {
		return xerrors.Errorf("failed to relay to pubsub messages for topic %s: %w", topicName, err)
	}

	// Cancel message relay on shutdown.
	lc.Append(fx.Hook{
		OnStop: func(_ context.Context) error {
			cancelFunc()
			return nil
		},
	})

	log.Infof("relaying messages for pubsub topic %s", topicName)
	return nil
}

func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
	local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
	if err != nil {
		return nil, err
	}
	local.OnReady(marketevents.ReadyLogger("discovery"))
	lc.Append(fx.Hook{
		OnStart: func(ctx context.Context) error {
			return local.Start(ctx)
		},
	})
	return local, nil
}

func RetrievalResolver(l *discoveryimpl.Local) discovery.PeerResolver {
	return discoveryimpl.Multi(l)
}

type RandomBeaconParams struct {
	fx.In

	PubSub      *pubsub.PubSub `optional:"true"`
	Cs          *store.ChainStore
	DrandConfig dtypes.DrandSchedule
}

func BuiltinDrandConfig() dtypes.DrandSchedule {
	return build.DrandConfigSchedule()
}

func RandomSchedule(lc fx.Lifecycle, mctx helpers.MetricsCtx, p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
	gen, err := p.Cs.GetGenesis(helpers.LifecycleCtx(mctx, lc))
	if err != nil {
		return nil, err
	}

	shd := beacon.Schedule{}
	for _, dc := range p.DrandConfig {
		bc, err := drand.NewDrandBeacon(gen.Timestamp, build.BlockDelaySecs, p.PubSub, dc.Config)
		if err != nil {
			return nil, xerrors.Errorf("creating drand beacon: %w", err)
		}
		shd = append(shd, beacon.BeaconPoint{Start: dc.Start, Beacon: bc})
	}

	return shd, nil
}

func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
	jrnl, err := fsjournal.OpenFSJournal(lr, disabled)
	if err != nil {
		return nil, err
	}

	lc.Append(fx.Hook{
		OnStop: func(_ context.Context) error { return jrnl.Close() },
	})

	return jrnl, err
}