package events

import (
	"context"
	"sync"
	"time"

	"github.com/filecoin-project/go-state-types/abi"
	"github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log/v2"
	"golang.org/x/xerrors"

	"github.com/filecoin-project/go-address"
	"github.com/filecoin-project/lotus/api"
	"github.com/filecoin-project/lotus/build"
	"github.com/filecoin-project/lotus/chain/store"
	"github.com/filecoin-project/lotus/chain/types"
)

var log = logging.Logger("events")

// HeightHandler `curH`-`ts.Height` = `confidence`
type (
	HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
	RevertHandler func(ctx context.Context, ts *types.TipSet) error
)

type heightHandler struct {
	confidence int
	called     bool

	handle HeightHandler
	revert RevertHandler
}

type EventAPI interface {
	ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
	ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
	ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
	ChainHead(context.Context) (*types.TipSet, error)
	StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
	ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)

	StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
}

type Events struct {
	api EventAPI

	tsc *tipSetCache
	lk  sync.Mutex

	ready     chan struct{}
	readyOnce sync.Once

	heightEvents
	*hcEvents

	observers []TipSetObserver
}

func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) *Events {
	tsc := newTSCache(gcConfidence, api)

	e := &Events{
		api: api,

		tsc: tsc,

		heightEvents: heightEvents{
			tsc:          tsc,
			ctx:          ctx,
			gcConfidence: gcConfidence,

			heightTriggers:   map[uint64]*heightHandler{},
			htTriggerHeights: map[abi.ChainEpoch][]uint64{},
			htHeights:        map[abi.ChainEpoch][]uint64{},
		},

		hcEvents:  newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
		ready:     make(chan struct{}),
		observers: []TipSetObserver{},
	}

	go e.listenHeadChanges(ctx)

	// Wait for the first tipset to be seen or bail if shutting down
	select {
	case <-e.ready:
	case <-ctx.Done():
	}

	return e
}

func NewEvents(ctx context.Context, api EventAPI) *Events {
	gcConfidence := 2 * build.ForkLengthThreshold
	return NewEventsWithConfidence(ctx, api, gcConfidence)
}

func (e *Events) listenHeadChanges(ctx context.Context) {
	for {
		if err := e.listenHeadChangesOnce(ctx); err != nil {
			log.Errorf("listen head changes errored: %s", err)
		} else {
			log.Warn("listenHeadChanges quit")
		}
		select {
		case <-build.Clock.After(time.Second):
		case <-ctx.Done():
			log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
			return
		}

		log.Info("restarting listenHeadChanges")
	}
}

func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	notifs, err := e.api.ChainNotify(ctx)
	if err != nil {
		// Retry is handled by caller
		return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
	}

	var cur []*api.HeadChange
	var ok bool

	// Wait for first tipset or bail
	select {
	case cur, ok = <-notifs:
		if !ok {
			return xerrors.Errorf("notification channel closed")
		}
	case <-ctx.Done():
		return ctx.Err()
	}

	if len(cur) != 1 {
		return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
	}

	if cur[0].Type != store.HCCurrent {
		return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
	}

	if err := e.tsc.add(cur[0].Val); err != nil {
		log.Warnf("tsc.add: adding current tipset failed: %v", err)
	}

	e.readyOnce.Do(func() {
		e.lastTs = cur[0].Val
		// Signal that we have seen first tipset
		close(e.ready)
	})

	for notif := range notifs {
		var rev, app []*types.TipSet
		for _, notif := range notif {
			switch notif.Type {
			case store.HCRevert:
				rev = append(rev, notif.Val)
			case store.HCApply:
				app = append(app, notif.Val)
			default:
				log.Warnf("unexpected head change notification type: '%s'", notif.Type)
			}
		}

		if err := e.headChange(ctx, rev, app); err != nil {
			log.Warnf("headChange failed: %s", err)
		}

		// sync with fake chainstore (for tests)
		if fcs, ok := e.api.(interface{ notifDone() }); ok {
			fcs.notifDone()
		}
	}

	return nil
}

func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
	if len(app) == 0 {
		return xerrors.New("events.headChange expected at least one applied tipset")
	}

	e.lk.Lock()
	defer e.lk.Unlock()

	if err := e.headChangeAt(rev, app); err != nil {
		return err
	}

	if err := e.observeChanges(ctx, rev, app); err != nil {
		return err
	}
	return e.processHeadChangeEvent(rev, app)
}

// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
	Apply(ctx context.Context, ts *types.TipSet) error
	Revert(ctx context.Context, ts *types.TipSet) error
}

// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
	e.lk.Lock()
	defer e.lk.Unlock()
	e.observers = append(e.observers, obs)
	return nil
}

// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
	for _, ts := range rev {
		for _, o := range e.observers {
			_ = o.Revert(ctx, ts)
		}
	}

	for _, ts := range app {
		for _, o := range e.observers {
			_ = o.Apply(ctx, ts)
		}
	}

	return nil
}