// stm: #unit
package dagstore

import (
	"bytes"
	"context"
	"os"
	"testing"
	"time"

	"github.com/ipfs/go-cid"
	carindex "github.com/ipld/go-car/v2/index"
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
	mh "github.com/multiformats/go-multihash"
	"github.com/stretchr/testify/require"
	"golang.org/x/xerrors"

	"github.com/filecoin-project/dagstore"
	"github.com/filecoin-project/dagstore/mount"
	"github.com/filecoin-project/dagstore/shard"

	"github.com/filecoin-project/lotus/node/config"
)

// TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found"
// error, the wrapper will attempt to register the shard then reacquire
func TestWrapperAcquireRecoveryDestroy(t *testing.T) {
	ctx := context.Background()
	pieceCid, err := cid.Parse("bafkqaaa")
	require.NoError(t, err)

	h, err := mocknet.New().GenPeer()
	require.NoError(t, err)
	// Create a DAG store wrapper
	dagst, w, err := NewDAGStore(config.DAGStoreConfig{
		RootDir:    t.TempDir(),
		GCInterval: config.Duration(1 * time.Millisecond),
	}, mockLotusMount{}, h)
	require.NoError(t, err)

	defer dagst.Close() //nolint:errcheck

	// Return an error from acquire shard the first time
	acquireShardErr := make(chan error, 1)
	acquireShardErr <- xerrors.Errorf("unknown shard: %w", dagstore.ErrShardUnknown)

	// Create a mock DAG store in place of the real DAG store
	mock := &mockDagStore{
		acquireShardErr: acquireShardErr,
		acquireShardRes: dagstore.ShardResult{
			Accessor: getShardAccessor(t),
		},
		register: make(chan shard.Key, 1),
		destroy:  make(chan shard.Key, 1),
	}
	w.dagst = mock

	//stm: @MARKET_DAGSTORE_ACQUIRE_SHARD_002
	mybs, err := w.LoadShard(ctx, pieceCid)
	require.NoError(t, err)

	// Expect the wrapper to try to recover from the error returned from
	// acquire shard by calling register shard with the same key
	tctx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	select {
	case <-tctx.Done():
		require.Fail(t, "failed to call register")
	case k := <-mock.register:
		require.Equal(t, k.String(), pieceCid.String())
	}

	// Verify that we can get things from the acquired blockstore
	var count int
	ch, err := mybs.AllKeysChan(ctx)
	require.NoError(t, err)
	for range ch {
		count++
	}
	require.Greater(t, count, 0)

	// Destroy the shard
	dr := make(chan dagstore.ShardResult, 1)
	err = w.DestroyShard(ctx, pieceCid, dr)
	require.NoError(t, err)

	dctx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	select {
	case <-dctx.Done():
		require.Fail(t, "failed to call destroy")
	case k := <-mock.destroy:
		require.Equal(t, k.String(), pieceCid.String())
	}

	var dcount int
	dch, err := mybs.AllKeysChan(ctx)
	require.NoError(t, err)
	for range dch {
		count++
	}
	require.Equal(t, dcount, 0)
}

// TestWrapperBackground verifies the behaviour of the background go routine
func TestWrapperBackground(t *testing.T) {
	ctx := context.Background()
	h, err := mocknet.New().GenPeer()
	require.NoError(t, err)

	// Create a DAG store wrapper
	dagst, w, err := NewDAGStore(config.DAGStoreConfig{
		RootDir:    t.TempDir(),
		GCInterval: config.Duration(1 * time.Millisecond),
	}, mockLotusMount{}, h)
	require.NoError(t, err)

	defer dagst.Close() //nolint:errcheck

	// Create a mock DAG store in place of the real DAG store
	mock := &mockDagStore{
		gc:      make(chan struct{}, 1),
		recover: make(chan shard.Key, 1),
		close:   make(chan struct{}, 1),
	}
	w.dagst = mock

	// Start up the wrapper
	//stm: @MARKET_DAGSTORE_START_001
	err = w.Start(ctx)
	require.NoError(t, err)

	// Expect GC to be called automatically
	//stm: @MARKET_DAGSTORE_START_002
	tctx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	select {
	case <-tctx.Done():
		require.Fail(t, "failed to call GC")
	case <-mock.gc:
	}

	// Expect that when the wrapper is closed it will call close on the
	// DAG store
	//stm: @MARKET_DAGSTORE_CLOSE_001
	err = w.Close()
	require.NoError(t, err)

	tctx, cancel3 := context.WithTimeout(ctx, time.Second)
	defer cancel3()
	select {
	case <-tctx.Done():
		require.Fail(t, "failed to call close")
	case <-mock.close:
	}
}

type mockDagStore struct {
	acquireShardErr chan error
	acquireShardRes dagstore.ShardResult
	register        chan shard.Key

	gc      chan struct{}
	recover chan shard.Key
	destroy chan shard.Key
	close   chan struct{}
}

func (m *mockDagStore) GetIterableIndex(key shard.Key) (carindex.IterableIndex, error) {
	return nil, nil
}

func (m *mockDagStore) ShardsContainingMultihash(ctx context.Context, h mh.Multihash) ([]shard.Key, error) {
	return nil, nil
}

func (m *mockDagStore) GetShardKeysForCid(c cid.Cid) ([]shard.Key, error) {
	panic("implement me")
}

func (m *mockDagStore) DestroyShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.DestroyOpts) error {
	m.destroy <- key
	out <- dagstore.ShardResult{Key: key}
	return nil
}

func (m *mockDagStore) GetShardInfo(k shard.Key) (dagstore.ShardInfo, error) {
	panic("implement me")
}

func (m *mockDagStore) AllShardsInfo() dagstore.AllShardsInfo {
	panic("implement me")
}

func (m *mockDagStore) Start(_ context.Context) error {
	return nil
}

func (m *mockDagStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error {
	m.register <- key
	out <- dagstore.ShardResult{Key: key}
	return nil
}

func (m *mockDagStore) AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error {
	select {
	case err := <-m.acquireShardErr:
		return err
	default:
	}

	out <- m.acquireShardRes
	return nil
}

func (m *mockDagStore) RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error {
	m.recover <- key
	return nil
}

func (m *mockDagStore) GC(ctx context.Context) (*dagstore.GCResult, error) {
	select {
	case m.gc <- struct{}{}:
	default:
	}

	return nil, nil
}

func (m *mockDagStore) Close() error {
	m.close <- struct{}{}
	return nil
}

type mockLotusMount struct {
}

func (m mockLotusMount) Start(ctx context.Context) error {
	return nil
}

func (m mockLotusMount) FetchUnsealedPiece(context.Context, cid.Cid) (mount.Reader, error) {
	panic("implement me")
}

func (m mockLotusMount) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
	panic("implement me")
}

func (m mockLotusMount) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
	panic("implement me")
}

func getShardAccessor(t *testing.T) *dagstore.ShardAccessor {
	data, err := os.ReadFile("./fixtures/sample-rw-bs-v2.car")
	require.NoError(t, err)
	buff := bytes.NewReader(data)
	reader := &mount.NopCloser{Reader: buff, ReaderAt: buff, Seeker: buff}
	shardAccessor, err := dagstore.NewShardAccessor(reader, nil, nil)
	require.NoError(t, err)
	return shardAccessor
}