package sectorblocks

import (
	"bytes"
	"context"
	"encoding/binary"
	"errors"
	"io"
	"sync"

	"github.com/ipfs/go-datastore"
	"github.com/ipfs/go-datastore/namespace"
	"github.com/ipfs/go-datastore/query"
	dshelp "github.com/ipfs/go-ipfs-ds-help"
	"golang.org/x/xerrors"

	cborutil "github.com/filecoin-project/go-cbor-util"
	"github.com/filecoin-project/go-state-types/abi"
	sealing "github.com/filecoin-project/lotus/extern/storage-sealing"

	"github.com/filecoin-project/lotus/api"
	"github.com/filecoin-project/lotus/node/modules/dtypes"
	"github.com/filecoin-project/lotus/storage"
)

type SealSerialization uint8

const (
	SerializationUnixfs0 SealSerialization = 'u'
)

var dsPrefix = datastore.NewKey("/sealedblocks")

var ErrNotFound = errors.New("not found")

func DealIDToDsKey(dealID abi.DealID) datastore.Key {
	buf := make([]byte, binary.MaxVarintLen64)
	size := binary.PutUvarint(buf, uint64(dealID))
	return dshelp.NewKeyFromBinary(buf[:size])
}

func DsKeyToDealID(key datastore.Key) (uint64, error) {
	buf, err := dshelp.BinaryFromDsKey(key)
	if err != nil {
		return 0, err
	}
	dealID, _ := binary.Uvarint(buf)
	return dealID, nil
}

type SectorBlocks struct {
	*storage.Miner

	keys  datastore.Batching
	keyLk sync.Mutex
}

func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks {
	sbc := &SectorBlocks{
		Miner: miner,
		keys:  namespace.Wrap(ds, dsPrefix),
	}

	return sbc
}

func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset abi.PaddedPieceSize, size abi.UnpaddedPieceSize) error {
	st.keyLk.Lock() // TODO: make this multithreaded
	defer st.keyLk.Unlock()

	v, err := st.keys.Get(DealIDToDsKey(dealID))
	if err == datastore.ErrNotFound {
		err = nil
	}
	if err != nil {
		return xerrors.Errorf("getting existing refs: %w", err)
	}

	var refs api.SealedRefs
	if len(v) > 0 {
		if err := cborutil.ReadCborRPC(bytes.NewReader(v), &refs); err != nil {
			return xerrors.Errorf("decoding existing refs: %w", err)
		}
	}

	refs.Refs = append(refs.Refs, api.SealedRef{
		SectorID: sectorID,
		Offset:   offset,
		Size:     size,
	})

	newRef, err := cborutil.Dump(&refs)
	if err != nil {
		return xerrors.Errorf("serializing refs: %w", err)
	}
	return st.keys.Put(DealIDToDsKey(dealID), newRef) // TODO: batch somehow
}

func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d sealing.DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
	sn, offset, err := st.Miner.AddPieceToAnySector(ctx, size, r, d)
	if err != nil {
		return 0, 0, err
	}

	// TODO: DealID has very low finality here
	err = st.writeRef(d.DealID, sn, offset, size)
	if err != nil {
		return 0, 0, xerrors.Errorf("writeRef: %w", err)
	}

	return sn, offset, nil
}

func (st *SectorBlocks) List() (map[uint64][]api.SealedRef, error) {
	res, err := st.keys.Query(query.Query{})
	if err != nil {
		return nil, err
	}

	ents, err := res.Rest()
	if err != nil {
		return nil, err
	}

	out := map[uint64][]api.SealedRef{}
	for _, ent := range ents {
		dealID, err := DsKeyToDealID(datastore.RawKey(ent.Key))
		if err != nil {
			return nil, err
		}

		var refs api.SealedRefs
		if err := cborutil.ReadCborRPC(bytes.NewReader(ent.Value), &refs); err != nil {
			return nil, err
		}

		out[dealID] = refs.Refs
	}

	return out, nil
}

func (st *SectorBlocks) GetRefs(dealID abi.DealID) ([]api.SealedRef, error) { // TODO: track local sectors
	ent, err := st.keys.Get(DealIDToDsKey(dealID))
	if err == datastore.ErrNotFound {
		err = ErrNotFound
	}
	if err != nil {
		return nil, err
	}

	var refs api.SealedRefs
	if err := cborutil.ReadCborRPC(bytes.NewReader(ent), &refs); err != nil {
		return nil, err
	}

	return refs.Refs, nil
}

func (st *SectorBlocks) GetSize(dealID abi.DealID) (uint64, error) {
	refs, err := st.GetRefs(dealID)
	if err != nil {
		return 0, err
	}

	return uint64(refs[0].Size), nil
}

func (st *SectorBlocks) Has(dealID abi.DealID) (bool, error) {
	// TODO: ensure sector is still there
	return st.keys.Has(DealIDToDsKey(dealID))
}