package rpcenc

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"
	"net/url"
	"path"
	"reflect"
	"strconv"
	"sync"
	"time"

	"github.com/google/uuid"
	logging "github.com/ipfs/go-log/v2"
	"golang.org/x/xerrors"

	"github.com/filecoin-project/go-jsonrpc"
	"github.com/filecoin-project/go-state-types/abi"
	sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
)

var log = logging.Logger("rpcenc")

var Timeout = 30 * time.Second

type StreamType string

const (
	Null       StreamType = "null"
	PushStream StreamType = "push"
	// TODO: Data transfer handoff to workers?
)

type ReaderStream struct {
	Type StreamType
	Info string
}

func ReaderParamEncoder(addr string) jsonrpc.Option {
	return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) {
		r := value.Interface().(io.Reader)

		if r, ok := r.(*sealing.NullReader); ok {
			return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
		}

		reqID := uuid.New()
		u, err := url.Parse(addr)
		if err != nil {
			return reflect.Value{}, xerrors.Errorf("parsing push address: %w", err)
		}
		u.Path = path.Join(u.Path, reqID.String())

		go func() {
			// TODO: figure out errors here

			resp, err := http.Post(u.String(), "application/octet-stream", r)
			if err != nil {
				log.Errorf("sending reader param: %+v", err)
				return
			}

			defer resp.Body.Close() //nolint:errcheck

			if resp.StatusCode != 200 {
				b, _ := ioutil.ReadAll(resp.Body)
				log.Errorf("sending reader param (%s): non-200 status: %s, msg: '%s'", u.String(), resp.Status, string(b))
				return
			}

		}()

		return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil
	})
}

type waitReadCloser struct {
	io.ReadCloser
	wait chan struct{}
}

func (w *waitReadCloser) Read(p []byte) (int, error) {
	n, err := w.ReadCloser.Read(p)
	if err != nil {
		close(w.wait)
	}
	return n, err
}

func (w *waitReadCloser) Close() error {
	close(w.wait)
	return w.ReadCloser.Close()
}

func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
	var readersLk sync.Mutex
	readers := map[uuid.UUID]chan *waitReadCloser{}

	hnd := func(resp http.ResponseWriter, req *http.Request) {
		strId := path.Base(req.URL.Path)
		u, err := uuid.Parse(strId)
		if err != nil {
			http.Error(resp, fmt.Sprintf("parsing reader uuid: %s", err), 400)
			return
		}

		readersLk.Lock()
		ch, found := readers[u]
		if !found {
			ch = make(chan *waitReadCloser)
			readers[u] = ch
		}
		readersLk.Unlock()

		wr := &waitReadCloser{
			ReadCloser: req.Body,
			wait:       make(chan struct{}),
		}

		tctx, cancel := context.WithTimeout(req.Context(), Timeout)
		defer cancel()

		select {
		case ch <- wr:
		case <-tctx.Done():
			close(ch)
			log.Errorf("context error in reader stream handler (1): %v", tctx.Err())
			resp.WriteHeader(500)
			return
		}

		select {
		case <-wr.wait:
		case <-req.Context().Done():
			log.Errorf("context error in reader stream handler (2): %v", req.Context().Err())
			resp.WriteHeader(500)
			return
		}

		resp.WriteHeader(200)
	}

	dec := jsonrpc.WithParamDecoder(new(io.Reader), func(ctx context.Context, b []byte) (reflect.Value, error) {
		var rs ReaderStream
		if err := json.Unmarshal(b, &rs); err != nil {
			return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
		}

		if rs.Type == Null {
			n, err := strconv.ParseInt(rs.Info, 10, 64)
			if err != nil {
				return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
			}

			return reflect.ValueOf(sealing.NewNullReader(abi.UnpaddedPieceSize(n))), nil
		}

		u, err := uuid.Parse(rs.Info)
		if err != nil {
			return reflect.Value{}, xerrors.Errorf("parsing reader UUDD: %w", err)
		}

		readersLk.Lock()
		ch, found := readers[u]
		if !found {
			ch = make(chan *waitReadCloser)
			readers[u] = ch
		}
		readersLk.Unlock()

		ctx, cancel := context.WithTimeout(ctx, Timeout)
		defer cancel()

		select {
		case wr, ok := <-ch:
			if !ok {
				return reflect.Value{}, xerrors.Errorf("handler timed out")
			}

			return reflect.ValueOf(wr), nil
		case <-ctx.Done():
			return reflect.Value{}, ctx.Err()
		}
	})

	return hnd, dec
}