package storage

import (
	"context"
	"sync"

	"github.com/filecoin-project/go-state-types/abi"

	"github.com/filecoin-project/go-address"
	"github.com/filecoin-project/lotus/chain/actors/builtin/miner"

	"github.com/filecoin-project/go-state-types/dline"
	"github.com/filecoin-project/lotus/chain/types"
)

const (
	SubmitConfidence    = 4
	ChallengeConfidence = 10
)

type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error)
type CompleteSubmitPoSTCb func(err error)

// wdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used
// by the changeHandler to execute actions and query state.
type wdPoStCommands interface {
	StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)

	startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
	startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc
	onAbort(ts *types.TipSet, deadline *dline.Info)
	recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
}

type changeHandler struct {
	api        wdPoStCommands
	actor      address.Address
	proveHdlr  *proveHandler
	submitHdlr *submitHandler
}

func newChangeHandler(api wdPoStCommands, actor address.Address) *changeHandler {
	posts := newPostsCache()
	p := newProver(api, posts)
	s := newSubmitter(api, posts)
	return &changeHandler{api: api, actor: actor, proveHdlr: p, submitHdlr: s}
}

func (ch *changeHandler) start() {
	go ch.proveHdlr.run()
	go ch.submitHdlr.run()
}

func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
	// Get the current deadline period
	di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key())
	if err != nil {
		return err
	}

	if !di.PeriodStarted() {
		return nil // not proving anything yet
	}

	hc := &headChange{
		ctx:     ctx,
		revert:  revert,
		advance: advance,
		di:      di,
	}

	select {
	case ch.proveHdlr.hcs <- hc:
	case <-ch.proveHdlr.shutdownCtx.Done():
	case <-ctx.Done():
	}

	select {
	case ch.submitHdlr.hcs <- hc:
	case <-ch.submitHdlr.shutdownCtx.Done():
	case <-ctx.Done():
	}

	return nil
}

func (ch *changeHandler) shutdown() {
	ch.proveHdlr.shutdown()
	ch.submitHdlr.shutdown()
}

func (ch *changeHandler) currentTSDI() (*types.TipSet, *dline.Info) {
	return ch.submitHdlr.currentTSDI()
}

// postsCache keeps a cache of PoSTs for each proving window
type postsCache struct {
	added chan *postInfo
	lk    sync.RWMutex
	cache map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams
}

func newPostsCache() *postsCache {
	return &postsCache{
		added: make(chan *postInfo, 16),
		cache: make(map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams),
	}
}

func (c *postsCache) add(di *dline.Info, posts []miner.SubmitWindowedPoStParams) {
	c.lk.Lock()
	defer c.lk.Unlock()

	// TODO: clear cache entries older than chain finality
	c.cache[di.Open] = posts

	c.added <- &postInfo{
		di:    di,
		posts: posts,
	}
}

func (c *postsCache) get(di *dline.Info) ([]miner.SubmitWindowedPoStParams, bool) {
	c.lk.RLock()
	defer c.lk.RUnlock()

	posts, ok := c.cache[di.Open]
	return posts, ok
}

type headChange struct {
	ctx     context.Context
	revert  *types.TipSet
	advance *types.TipSet
	di      *dline.Info
}

type currentPost struct {
	di    *dline.Info
	abort context.CancelFunc
}

type postResult struct {
	ts       *types.TipSet
	currPost *currentPost
	posts    []miner.SubmitWindowedPoStParams
	err      error
}

// proveHandler generates proofs
type proveHandler struct {
	api   wdPoStCommands
	posts *postsCache

	postResults chan *postResult
	hcs         chan *headChange

	current *currentPost

	shutdownCtx context.Context
	shutdown    context.CancelFunc

	// Used for testing
	processedHeadChanges chan *headChange
	processedPostResults chan *postResult
}

func newProver(
	api wdPoStCommands,
	posts *postsCache,
) *proveHandler {
	ctx, cancel := context.WithCancel(context.Background())
	return &proveHandler{
		api:         api,
		posts:       posts,
		postResults: make(chan *postResult),
		hcs:         make(chan *headChange),
		shutdownCtx: ctx,
		shutdown:    cancel,
	}
}

func (p *proveHandler) run() {
	// Abort proving on shutdown
	defer func() {
		if p.current != nil {
			p.current.abort()
		}
	}()

	for p.shutdownCtx.Err() == nil {
		select {
		case <-p.shutdownCtx.Done():
			return

		case hc := <-p.hcs:
			// Head changed
			p.processHeadChange(hc.ctx, hc.advance, hc.di)
			if p.processedHeadChanges != nil {
				p.processedHeadChanges <- hc
			}

		case res := <-p.postResults:
			// Proof generation complete
			p.processPostResult(res)
			if p.processedPostResults != nil {
				p.processedPostResults <- res
			}
		}
	}
}

func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
	// If the post window has expired, abort the current proof
	if p.current != nil && newTS.Height() >= p.current.di.Close {
		// Cancel the context on the current proof
		p.current.abort()

		// Clear out the reference to the proof so that we can immediately
		// start generating a new proof, without having to worry about state
		// getting clobbered when the abort completes
		p.current = nil
	}

	// Only generate one proof at a time
	if p.current != nil {
		return
	}

	// If the proof for the current post window has been generated, check the
	// next post window
	_, complete := p.posts.get(di)
	for complete {
		di = nextDeadline(di)
		_, complete = p.posts.get(di)
	}

	// Check if the chain is above the Challenge height for the post window
	if newTS.Height() < di.Challenge+ChallengeConfidence {
		return
	}

	p.current = &currentPost{di: di}
	curr := p.current
	p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) {
		p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err}
	})
}

func (p *proveHandler) processPostResult(res *postResult) {
	di := res.currPost.di
	if res.err != nil {
		// Proving failed so inform the API
		p.api.recordPoStFailure(res.err, res.ts, di)
		log.Warnf("Aborted window post Proving (Deadline: %+v)", di)
		p.api.onAbort(res.ts, di)

		// Check if the current post has already been aborted
		if p.current == res.currPost {
			// If the current post was not already aborted, setting it to nil
			// marks it as complete so that a new post can be started
			p.current = nil
		}
		return
	}

	// Completed processing this proving window
	p.current = nil

	// Add the proofs to the cache
	p.posts.add(di, res.posts)
}

type submitResult struct {
	pw  *postWindow
	err error
}

type SubmitState string

const (
	SubmitStateStart      SubmitState = "SubmitStateStart"
	SubmitStateSubmitting SubmitState = "SubmitStateSubmitting"
	SubmitStateComplete   SubmitState = "SubmitStateComplete"
)

type postWindow struct {
	ts          *types.TipSet
	di          *dline.Info
	submitState SubmitState
	abort       context.CancelFunc
}

type postInfo struct {
	di    *dline.Info
	posts []miner.SubmitWindowedPoStParams
}

// submitHandler submits proofs on-chain
type submitHandler struct {
	api   wdPoStCommands
	posts *postsCache

	submitResults chan *submitResult
	hcs           chan *headChange

	postWindows       map[abi.ChainEpoch]*postWindow
	getPostWindowReqs chan *getPWReq

	shutdownCtx context.Context
	shutdown    context.CancelFunc

	currentCtx context.Context
	currentTS  *types.TipSet
	currentDI  *dline.Info
	getTSDIReq chan chan *tsdi

	// Used for testing
	processedHeadChanges   chan *headChange
	processedSubmitResults chan *submitResult
	processedPostReady     chan *postInfo
}

func newSubmitter(
	api wdPoStCommands,
	posts *postsCache,
) *submitHandler {
	ctx, cancel := context.WithCancel(context.Background())
	return &submitHandler{
		api:               api,
		posts:             posts,
		submitResults:     make(chan *submitResult),
		hcs:               make(chan *headChange),
		postWindows:       make(map[abi.ChainEpoch]*postWindow),
		getPostWindowReqs: make(chan *getPWReq),
		getTSDIReq:        make(chan chan *tsdi),
		shutdownCtx:       ctx,
		shutdown:          cancel,
	}
}

func (s *submitHandler) run() {
	// On shutdown, abort in-progress submits
	defer func() {
		for _, pw := range s.postWindows {
			if pw.abort != nil {
				pw.abort()
			}
		}
	}()

	for s.shutdownCtx.Err() == nil {
		select {
		case <-s.shutdownCtx.Done():
			return

		case hc := <-s.hcs:
			// Head change
			s.processHeadChange(hc.ctx, hc.revert, hc.advance, hc.di)
			if s.processedHeadChanges != nil {
				s.processedHeadChanges <- hc
			}

		case pi := <-s.posts.added:
			// Proof generated
			s.processPostReady(pi)
			if s.processedPostReady != nil {
				s.processedPostReady <- pi
			}

		case res := <-s.submitResults:
			// Submit complete
			s.processSubmitResult(res)
			if s.processedSubmitResults != nil {
				s.processedSubmitResults <- res
			}

		case pwreq := <-s.getPostWindowReqs:
			// used by getPostWindow() to sync with run loop
			pwreq.out <- s.postWindows[pwreq.di.Open]

		case out := <-s.getTSDIReq:
			// used by currentTSDI() to sync with run loop
			out <- &tsdi{ts: s.currentTS, di: s.currentDI}
		}
	}
}

// processHeadChange is called when the chain head changes
func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) {
	s.currentCtx = ctx
	s.currentTS = advance
	s.currentDI = di

	// Start tracking the current post window if we're not already
	// TODO: clear post windows older than chain finality
	if _, ok := s.postWindows[di.Open]; !ok {
		s.postWindows[di.Open] = &postWindow{
			di:          di,
			ts:          advance,
			submitState: SubmitStateStart,
		}
	}

	// Apply the change to all post windows
	for _, pw := range s.postWindows {
		s.processHeadChangeForPW(ctx, revert, advance, pw)
	}
}

func (s *submitHandler) processHeadChangeForPW(ctx context.Context, revert *types.TipSet, advance *types.TipSet, pw *postWindow) {
	revertedToPrevDL := revert != nil && revert.Height() < pw.di.Open
	expired := advance.Height() >= pw.di.Close

	// If the chain was reverted back to the previous deadline, or if the post
	// window has expired, abort submit
	if pw.submitState == SubmitStateSubmitting && (revertedToPrevDL || expired) {
		// Replace the aborted postWindow with a new one so that we can
		// submit again at any time without the state getting clobbered
		// when the abort completes
		abort := pw.abort
		if abort != nil {
			pw = &postWindow{
				di:          pw.di,
				ts:          advance,
				submitState: SubmitStateStart,
			}
			s.postWindows[pw.di.Open] = pw

			// Abort the current submit
			abort()
		}
	} else if pw.submitState == SubmitStateComplete && revertedToPrevDL {
		// If submit for this deadline has completed, but the chain was
		// reverted back to the previous deadline, reset the submit state to the
		// starting state, so that it can be resubmitted
		pw.submitState = SubmitStateStart
	}

	// Submit the proof to chain if the proof has been generated and the chain
	// height is above confidence
	s.submitIfReady(ctx, advance, pw)
}

// processPostReady is called when a proof generation completes
func (s *submitHandler) processPostReady(pi *postInfo) {
	pw, ok := s.postWindows[pi.di.Open]
	if ok {
		s.submitIfReady(s.currentCtx, s.currentTS, pw)
	}
}

// submitIfReady submits a proof if the chain is high enough and the proof
// has been generated for this deadline
func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet, pw *postWindow) {
	// If the window has expired, there's nothing more to do.
	if advance.Height() >= pw.di.Close {
		return
	}

	// Check if we're already submitting, or already completed submit
	if pw.submitState != SubmitStateStart {
		return
	}

	// Check if we've reached the confidence height to submit
	if advance.Height() < pw.di.Open+SubmitConfidence {
		return
	}

	// Check if the proofs have been generated for this deadline
	posts, ok := s.posts.get(pw.di)
	if !ok {
		return
	}

	// If there was nothing to prove, move straight to the complete state
	if len(posts) == 0 {
		pw.submitState = SubmitStateComplete
		return
	}

	// Start submitting post
	pw.submitState = SubmitStateSubmitting
	pw.abort = s.api.startSubmitPoST(ctx, advance, pw.di, posts, func(err error) {
		s.submitResults <- &submitResult{pw: pw, err: err}
	})
}

// processSubmitResult is called with the response to a submit
func (s *submitHandler) processSubmitResult(res *submitResult) {
	if res.err != nil {
		// Submit failed so inform the API and go back to the start state
		s.api.recordPoStFailure(res.err, res.pw.ts, res.pw.di)
		log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di)
		s.api.onAbort(res.pw.ts, res.pw.di)

		res.pw.submitState = SubmitStateStart
		return
	}

	// Submit succeeded so move to complete state
	res.pw.submitState = SubmitStateComplete
}

type tsdi struct {
	ts *types.TipSet
	di *dline.Info
}

func (s *submitHandler) currentTSDI() (*types.TipSet, *dline.Info) {
	out := make(chan *tsdi)
	s.getTSDIReq <- out
	res := <-out
	return res.ts, res.di
}

type getPWReq struct {
	di  *dline.Info
	out chan *postWindow
}

func (s *submitHandler) getPostWindow(di *dline.Info) *postWindow {
	out := make(chan *postWindow)
	s.getPostWindowReqs <- &getPWReq{di: di, out: out}
	return <-out
}

// nextDeadline gets deadline info for the subsequent deadline
func nextDeadline(currentDeadline *dline.Info) *dline.Info {
	periodStart := currentDeadline.PeriodStart
	newDeadline := currentDeadline.Index + 1
	if newDeadline == miner.WPoStPeriodDeadlines {
		newDeadline = 0
		periodStart = periodStart + miner.WPoStProvingPeriod
	}

	return NewDeadlineInfo(periodStart, newDeadline, currentDeadline.CurrentEpoch)
}

func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info {
	return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod, miner.WPoStChallengeWindow, miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff)
}