// stm: #unit
package badgerbs

import (
	"context"
	"fmt"
	"io"
	"reflect"
	"strings"
	"testing"

	u "github.com/ipfs/boxo/util"
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipfs/go-cid"
	ipld "github.com/ipfs/go-ipld-format"
	"github.com/stretchr/testify/require"

	"github.com/filecoin-project/lotus/blockstore"
)

// TODO: move this to go-ipfs-blockstore.
type Suite struct {
	NewBlockstore  func(tb testing.TB) (bs blockstore.BasicBlockstore, path string)
	OpenBlockstore func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error)
}

func (s *Suite) RunTests(t *testing.T, prefix string) {
	v := reflect.TypeOf(s)
	f := func(t *testing.T) {
		for i := 0; i < v.NumMethod(); i++ {
			if m := v.Method(i); strings.HasPrefix(m.Name, "Test") {
				f := m.Func.Interface().(func(*Suite, *testing.T))
				t.Run(m.Name, func(t *testing.T) {
					f(s, t)
				})
			}
		}
	}

	if prefix == "" {
		f(t)
	} else {
		t.Run(prefix, f)
	}
}

func (s *Suite) TestGetWhenKeyNotPresent(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_GET_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	ctx := context.Background()
	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	c := cid.NewCidV0(u.Hash([]byte("stuff")))
	bl, err := bs.Get(ctx, c)
	require.Nil(t, bl)
	require.Equal(t, ipld.ErrNotFound{Cid: c}, err)
}

func (s *Suite) TestGetWhenKeyIsNil(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_GET_001
	ctx := context.Background()
	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	_, err := bs.Get(ctx, cid.Undef)
	require.Equal(t, ipld.ErrNotFound{Cid: cid.Undef}, err)
}

func (s *Suite) TestPutThenGetBlock(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	//stm: @SPLITSTORE_BADGER_GET_001
	ctx := context.Background()
	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	orig := blocks.NewBlock([]byte("some data"))

	err := bs.Put(ctx, orig)
	require.NoError(t, err)

	fetched, err := bs.Get(ctx, orig.Cid())
	require.NoError(t, err)
	require.Equal(t, orig.RawData(), fetched.RawData())
}

func (s *Suite) TestHas(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_HAS_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	ctx := context.Background()
	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	orig := blocks.NewBlock([]byte("some data"))

	err := bs.Put(ctx, orig)
	require.NoError(t, err)

	ok, err := bs.Has(ctx, orig.Cid())
	require.NoError(t, err)
	require.True(t, ok)

	ok, err = bs.Has(ctx, blocks.NewBlock([]byte("another thing")).Cid())
	require.NoError(t, err)
	require.False(t, ok)
}

func (s *Suite) TestCidv0v1(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	//stm: @SPLITSTORE_BADGER_GET_001
	ctx := context.Background()
	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	orig := blocks.NewBlock([]byte("some data"))

	err := bs.Put(ctx, orig)
	require.NoError(t, err)

	fetched, err := bs.Get(ctx, cid.NewCidV1(cid.DagProtobuf, orig.Cid().Hash()))
	require.NoError(t, err)
	require.Equal(t, orig.RawData(), fetched.RawData())
}

func (s *Suite) TestPutThenGetSizeBlock(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	//stm: @SPLITSTORE_BADGER_GET_SIZE_001
	ctx := context.Background()

	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	block := blocks.NewBlock([]byte("some data"))
	missingBlock := blocks.NewBlock([]byte("missingBlock"))
	emptyBlock := blocks.NewBlock([]byte{})

	err := bs.Put(ctx, block)
	require.NoError(t, err)

	blockSize, err := bs.GetSize(ctx, block.Cid())
	require.NoError(t, err)
	require.Len(t, block.RawData(), blockSize)

	err = bs.Put(ctx, emptyBlock)
	require.NoError(t, err)

	emptySize, err := bs.GetSize(ctx, emptyBlock.Cid())
	require.NoError(t, err)
	require.Zero(t, emptySize)

	missingCid := missingBlock.Cid()
	missingSize, err := bs.GetSize(ctx, missingCid)
	require.Equal(t, ipld.ErrNotFound{Cid: missingCid}, err)
	require.Equal(t, -1, missingSize)
}

func (s *Suite) TestAllKeysSimple(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	keys := insertBlocks(t, bs, 100)

	ctx := context.Background()
	ch, err := bs.AllKeysChan(ctx)
	require.NoError(t, err)
	actual := collect(ch)

	require.ElementsMatch(t, keys, actual)
}

func (s *Suite) TestAllKeysRespectsContext(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	//stm: @SPLITSTORE_BADGER_ALL_KEYS_CHAN_001
	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	_ = insertBlocks(t, bs, 100)

	ctx, cancel := context.WithCancel(context.Background())
	ch, err := bs.AllKeysChan(ctx)
	require.NoError(t, err)

	// consume 2, then cancel context.
	v, ok := <-ch
	require.NotEqual(t, cid.Undef, v)
	require.True(t, ok)

	v, ok = <-ch
	require.NotEqual(t, cid.Undef, v)
	require.True(t, ok)

	cancel()
	// pull one value out to avoid race
	_, _ = <-ch

	v, ok = <-ch
	require.Equal(t, cid.Undef, v)
	require.False(t, ok)
}

func (s *Suite) TestDoubleClose(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	bs, _ := s.NewBlockstore(t)
	c, ok := bs.(io.Closer)
	if !ok {
		t.SkipNow()
	}
	require.NoError(t, c.Close())
	require.NoError(t, c.Close())
}

func (s *Suite) TestReopenPutGet(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	//stm: @SPLITSTORE_BADGER_GET_001
	ctx := context.Background()
	bs, path := s.NewBlockstore(t)
	c, ok := bs.(io.Closer)
	if !ok {
		t.SkipNow()
	}

	orig := blocks.NewBlock([]byte("some data"))
	err := bs.Put(ctx, orig)
	require.NoError(t, err)

	err = c.Close()
	require.NoError(t, err)

	bs, err = s.OpenBlockstore(t, path)
	require.NoError(t, err)

	fetched, err := bs.Get(ctx, orig.Cid())
	require.NoError(t, err)
	require.Equal(t, orig.RawData(), fetched.RawData())

	err = bs.(io.Closer).Close()
	require.NoError(t, err)
}

func (s *Suite) TestPutMany(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001
	//stm: @SPLITSTORE_BADGER_HAS_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	//stm: @SPLITSTORE_BADGER_GET_001, @SPLITSTORE_BADGER_PUT_MANY_001
	//stm: @SPLITSTORE_BADGER_ALL_KEYS_CHAN_001
	ctx := context.Background()
	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	blks := []blocks.Block{
		blocks.NewBlock([]byte("foo1")),
		blocks.NewBlock([]byte("foo2")),
		blocks.NewBlock([]byte("foo3")),
	}
	err := bs.PutMany(ctx, blks)
	require.NoError(t, err)

	for _, blk := range blks {
		fetched, err := bs.Get(ctx, blk.Cid())
		require.NoError(t, err)
		require.Equal(t, blk.RawData(), fetched.RawData())

		ok, err := bs.Has(ctx, blk.Cid())
		require.NoError(t, err)
		require.True(t, ok)
	}

	ch, err := bs.AllKeysChan(context.Background())
	require.NoError(t, err)

	cids := collect(ch)
	require.Len(t, cids, 3)
}

func (s *Suite) TestDelete(t *testing.T) {
	//stm: @SPLITSTORE_BADGER_PUT_001, @SPLITSTORE_BADGER_POOLED_STORAGE_KEY_001
	//stm: @SPLITSTORE_BADGER_DELETE_001, @SPLITSTORE_BADGER_POOLED_STORAGE_HAS_001
	//stm: @SPLITSTORE_BADGER_ALL_KEYS_CHAN_001, @SPLITSTORE_BADGER_HAS_001
	//stm: @SPLITSTORE_BADGER_PUT_MANY_001

	ctx := context.Background()
	bs, _ := s.NewBlockstore(t)
	if c, ok := bs.(io.Closer); ok {
		defer func() { require.NoError(t, c.Close()) }()
	}

	blks := []blocks.Block{
		blocks.NewBlock([]byte("foo1")),
		blocks.NewBlock([]byte("foo2")),
		blocks.NewBlock([]byte("foo3")),
	}
	err := bs.PutMany(ctx, blks)
	require.NoError(t, err)

	err = bs.DeleteBlock(ctx, blks[1].Cid())
	require.NoError(t, err)

	ch, err := bs.AllKeysChan(context.Background())
	require.NoError(t, err)

	cids := collect(ch)
	require.Len(t, cids, 2)
	require.ElementsMatch(t, cids, []cid.Cid{
		cid.NewCidV1(cid.Raw, blks[0].Cid().Hash()),
		cid.NewCidV1(cid.Raw, blks[2].Cid().Hash()),
	})

	has, err := bs.Has(ctx, blks[1].Cid())
	require.NoError(t, err)
	require.False(t, has)
}

func insertBlocks(t *testing.T, bs blockstore.BasicBlockstore, count int) []cid.Cid {
	ctx := context.Background()
	keys := make([]cid.Cid, count)
	for i := 0; i < count; i++ {
		block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
		err := bs.Put(ctx, block)
		require.NoError(t, err)
		// NewBlock assigns a CIDv0; we convert it to CIDv1 because that's what
		// the store returns.
		keys[i] = cid.NewCidV1(cid.Raw, block.Multihash())
	}
	return keys
}

func collect(ch <-chan cid.Cid) []cid.Cid {
	var keys []cid.Cid
	for k := range ch {
		keys = append(keys, k)
	}
	return keys
}