package sectorstorage

import (
	"bytes"
	"context"
	"io/ioutil"
	"math/rand"
	"net"
	"net/http"
	"testing"

	"github.com/filecoin-project/go-state-types/abi"
	"github.com/filecoin-project/go-statestore"
	"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
	specstorage "github.com/filecoin-project/specs-storage/storage"
	"github.com/gorilla/mux"
	"github.com/ipfs/go-cid"
	"github.com/ipfs/go-datastore"
	"github.com/ipfs/go-datastore/namespace"
	ds_sync "github.com/ipfs/go-datastore/sync"
	logging "github.com/ipfs/go-log/v2"
	"github.com/stretchr/testify/require"

	"github.com/filecoin-project/lotus/extern/sector-storage/stores"
	"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)

// TestPieceProviderReadPiece verifies that the ReadPiece method works correctly
// only uses miner and does NOT use any remote worker.
func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
	// Set up sector storage manager
	sealerCfg := SealerConfig{
		ParallelFetchLimit: 10,
		AllowAddPiece:      true,
		AllowPreCommit1:    true,
		AllowPreCommit2:    true,
		AllowCommit:        true,
		AllowUnseal:        true,
	}

	ppt := newPieceProviderTestHarness(t, sealerCfg, abi.RegisteredSealProof_StackedDrg8MiBV1)
	defer ppt.shutdown(t)

	// Create some padded data that aligns with the piece boundaries.
	pieceData := generatePieceData(8 * 127 * 1024 * 8)
	size := abi.UnpaddedPieceSize(len(pieceData))
	ppt.addPiece(t, pieceData)

	// read piece
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
		false, pieceData)

	// pre-commit 1
	preCommit1 := ppt.preCommit1(t)

	// check if IsUnsealed -> true
	require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
	// read piece
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
		false, pieceData)

	// pre-commit 2
	ppt.preCommit2(t, preCommit1)

	// check if IsUnsealed -> true
	require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
	// read piece
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
		false, pieceData)

	// finalize -> nil here will remove unsealed file
	ppt.finalizeSector(t, nil)

	// check if IsUnsealed -> false
	require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
	// Read the piece -> will have to unseal
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
		true, pieceData)

	// check if IsUnsealed -> true
	require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
	// read the piece -> will not have to unseal
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
		false, pieceData)

}
func TestReadPieceRemoteWorkers(t *testing.T) {
	logging.SetAllLoggers(logging.LevelDebug)

	// miner's worker can only add pieces to an unsealed sector.
	sealerCfg := SealerConfig{
		ParallelFetchLimit: 10,
		AllowAddPiece:      true,
		AllowPreCommit1:    false,
		AllowPreCommit2:    false,
		AllowCommit:        false,
		AllowUnseal:        false,
	}

	// test harness for an 8M sector.
	ppt := newPieceProviderTestHarness(t, sealerCfg, abi.RegisteredSealProof_StackedDrg8MiBV1)
	defer ppt.shutdown(t)

	// worker 2 will ONLY help with the sealing by first fetching
	// the unsealed file from the miner.
	ppt.addRemoteWorker(t, []sealtasks.TaskType{
		sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1,
		sealtasks.TTFetch, sealtasks.TTFinalize,
	})

	// create a worker that can ONLY unseal and fetch
	ppt.addRemoteWorker(t, []sealtasks.TaskType{
		sealtasks.TTUnseal, sealtasks.TTFetch,
	})

	// run the test

	// add one piece that aligns with the padding/piece boundaries.
	pd1 := generatePieceData(8 * 127 * 4 * 1024)
	pi1 := ppt.addPiece(t, pd1)
	pd1size := pi1.Size.Unpadded()

	pd2 := generatePieceData(8 * 127 * 4 * 1024)
	pi2 := ppt.addPiece(t, pd2)
	pd2size := pi2.Size.Unpadded()

	// pre-commit 1
	pC1 := ppt.preCommit1(t)

	// check if IsUnsealed -> true
	require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
	// Read the piece -> no need to unseal
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
		false, pd1)

	// pre-commit 2
	ppt.preCommit2(t, pC1)

	// check if IsUnsealed -> true
	require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
	// Read the piece -> no need to unseal
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
		false, pd1)

	// finalize the sector so we declare to the index we have the sealed file
	// so the unsealing worker can later look it up and fetch it if needed
	// sending nil here will remove all unsealed files after sector is finalized.
	ppt.finalizeSector(t, nil)

	// check if IsUnsealed -> false
	require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
	// Read the piece -> have to unseal since we removed the file.
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
		true, pd1)

	// Read the same piece again -> will NOT have to unseal.
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, false, pd1)

	// remove the unsealed file and read again -> will have to unseal.
	ppt.removeAllUnsealedSectorFiles(t)
	// check if IsUnsealed -> false
	require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
	ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
		true, pd1)

	// check if IsUnsealed -> true
	require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
	// Read Piece 2 -> no unsealing as it got unsealed above.
	ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2)

	// remove all unseal files -> Read Piece 2 -> will have to Unseal.
	ppt.removeAllUnsealedSectorFiles(t)

	// check if IsUnsealed -> false
	require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
	ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2)
}

type pieceProviderTestHarness struct {
	ctx         context.Context
	index       *stores.Index
	pp          PieceProvider
	sector      specstorage.SectorRef
	mgr         *Manager
	ticket      abi.SealRandomness
	commD       cid.Cid
	localStores []*stores.Local

	servers []*http.Server

	addedPieces []abi.PieceInfo
}

func generatePieceData(size uint64) []byte {
	bz := make([]byte, size)
	rand.Read(bz)
	return bz
}

func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
	ctx := context.Background()
	// listen on tcp socket to create an http server later
	address := "0.0.0.0:0"
	nl, err := net.Listen("tcp", address)
	require.NoError(t, err)

	// create index, storage, local store & remote store.
	index := stores.NewIndex()
	storage := newTestStorage(t)
	localStore, err := stores.NewLocal(ctx, storage, index, []string{"http://" + nl.Addr().String() + "/remote"})
	require.NoError(t, err)
	remoteStore := stores.NewRemote(localStore, index, nil, 6000, &stores.DefaultPartialFileHandler{})

	// data stores for state tracking.
	dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
	wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls")))
	smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))

	mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, wsts, smsts)
	require.NoError(t, err)

	// start a http server on the manager to serve sector file requests.
	svc := &http.Server{
		Addr:    nl.Addr().String(),
		Handler: mgr,
	}
	go func() {
		_ = svc.Serve(nl)
	}()

	pp := NewPieceProvider(remoteStore, index, mgr)

	sector := specstorage.SectorRef{
		ID: abi.SectorID{
			Miner:  100,
			Number: 10,
		},
		ProofType: sectorProofType,
	}

	ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9}

	ppt := &pieceProviderTestHarness{
		ctx:    ctx,
		index:  index,
		pp:     pp,
		sector: sector,
		mgr:    mgr,
		ticket: ticket,
	}
	ppt.servers = append(ppt.servers, svc)
	ppt.localStores = append(ppt.localStores, localStore)
	return ppt
}

func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtasks.TaskType) {
	// start an http Server
	address := "0.0.0.0:0"
	nl, err := net.Listen("tcp", address)
	require.NoError(t, err)

	localStore, err := stores.NewLocal(p.ctx, newTestStorage(t), p.index, []string{"http://" + nl.Addr().String() + "/remote"})
	require.NoError(t, err)

	fh := &stores.FetchHandler{
		Local:     localStore,
		PfHandler: &stores.DefaultPartialFileHandler{},
	}

	mux := mux.NewRouter()
	mux.PathPrefix("/remote").HandlerFunc(fh.ServeHTTP)
	svc := &http.Server{
		Addr:    nl.Addr().String(),
		Handler: mux,
	}

	go func() {
		_ = svc.Serve(nl)
	}()

	remote := stores.NewRemote(localStore, p.index, nil, 1000,
		&stores.DefaultPartialFileHandler{})

	dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
	csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))

	worker := newLocalWorker(nil, WorkerConfig{
		TaskTypes: tasks,
	}, remote, localStore, p.index, p.mgr, csts)

	p.servers = append(p.servers, svc)
	p.localStores = append(p.localStores, localStore)

	// register self with manager
	require.NoError(t, p.mgr.AddWorker(p.ctx, worker))
}

func (p *pieceProviderTestHarness) removeAllUnsealedSectorFiles(t *testing.T) {
	for i := range p.localStores {
		ls := p.localStores[i]
		require.NoError(t, ls.Remove(p.ctx, p.sector.ID, storiface.FTUnsealed, false))
	}
}

func (p *pieceProviderTestHarness) addPiece(t *testing.T, pieceData []byte) abi.PieceInfo {
	var existing []abi.UnpaddedPieceSize
	for _, pi := range p.addedPieces {
		existing = append(existing, pi.Size.Unpadded())
	}

	size := abi.UnpaddedPieceSize(len(pieceData))
	pieceInfo, err := p.mgr.AddPiece(p.ctx, p.sector, existing, size, bytes.NewReader(pieceData))
	require.NoError(t, err)

	p.addedPieces = append(p.addedPieces, pieceInfo)
	return pieceInfo
}

func (p *pieceProviderTestHarness) preCommit1(t *testing.T) specstorage.PreCommit1Out {
	preCommit1, err := p.mgr.SealPreCommit1(p.ctx, p.sector, p.ticket, p.addedPieces)
	require.NoError(t, err)
	return preCommit1
}

func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreCommit1Out) {
	sectorCids, err := p.mgr.SealPreCommit2(p.ctx, p.sector, pc1)
	require.NoError(t, err)
	commD := sectorCids.Unsealed
	p.commD = commD
}

func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) bool {
	b, err := p.pp.IsUnsealed(p.ctx, p.sector, offset, size)
	require.NoError(t, err)
	return b
}

func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
	expectedHadToUnseal bool, expectedBytes []byte) {
	rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
	require.NoError(t, err)
	require.NotNil(t, rd)
	require.Equal(t, expectedHadToUnseal, isUnsealed)
	defer func() { _ = rd.Close() }()

	// Make sure the input matches the output
	readData, err := ioutil.ReadAll(rd)
	require.NoError(t, err)
	require.Equal(t, expectedBytes, readData)
}

func (p *pieceProviderTestHarness) finalizeSector(t *testing.T, keepUnseal []specstorage.Range) {
	require.NoError(t, p.mgr.FinalizeSector(p.ctx, p.sector, keepUnseal))
}

func (p *pieceProviderTestHarness) shutdown(t *testing.T) {
	for _, svc := range p.servers {
		s := svc
		require.NoError(t, s.Shutdown(p.ctx))
	}
}