package main

import (
	"context"
	"fmt"
	"io/ioutil"
	"math/rand"
	"os"
	"sync"
	"time"

	"github.com/filecoin-project/lotus/api"
	"github.com/ipfs/go-cid"

	"github.com/filecoin-project/lotus/testplans/lotus-soup/testkit"
)

func dealsStress(t *testkit.TestEnvironment) error {
	// Dispatch/forward non-client roles to defaults.
	if t.Role != "client" {
		return testkit.HandleDefaultRole(t)
	}

	t.RecordMessage("running client")

	cl, err := testkit.PrepareClient(t)
	if err != nil {
		return err
	}

	ctx := context.Background()
	client := cl.FullApi

	// select a random miner
	minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))]
	if err := client.NetConnect(ctx, minerAddr.MinerNetAddrs); err != nil {
		return err
	}

	t.RecordMessage("selected %s as the miner", minerAddr.MinerActorAddr)

	time.Sleep(12 * time.Second)

	// prepare a number of concurrent data points
	deals := t.IntParam("deals")
	data := make([][]byte, 0, deals)
	files := make([]*os.File, 0, deals)
	cids := make([]cid.Cid, 0, deals)
	rng := rand.NewSource(time.Now().UnixNano())

	for i := 0; i < deals; i++ {
		dealData := make([]byte, 1600)
		rand.New(rng).Read(dealData)

		dealFile, err := ioutil.TempFile("/tmp", "data")
		if err != nil {
			return err
		}
		defer os.Remove(dealFile.Name())

		_, err = dealFile.Write(dealData)
		if err != nil {
			return err
		}

		dealCid, err := client.ClientImport(ctx, api.FileRef{Path: dealFile.Name(), IsCAR: false})
		if err != nil {
			return err
		}

		t.RecordMessage("deal %d file cid: %s", i, dealCid)

		data = append(data, dealData)
		files = append(files, dealFile)
		cids = append(cids, dealCid.Root)
	}

	concurrentDeals := true
	if t.StringParam("deal_mode") == "serial" {
		concurrentDeals = false
	}

	// this to avoid failure to get block
	time.Sleep(2 * time.Second)

	t.RecordMessage("starting storage deals")
	if concurrentDeals {

		var wg1 sync.WaitGroup
		for i := 0; i < deals; i++ {
			wg1.Add(1)
			go func(i int) {
				defer wg1.Done()
				t1 := time.Now()
				deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i], false)
				t.RecordMessage("started storage deal %d -> %s", i, deal)
				time.Sleep(2 * time.Second)
				t.RecordMessage("waiting for deal %d to be sealed", i)
				testkit.WaitDealSealed(t, ctx, client, deal)
				t.D().ResettingHistogram(fmt.Sprintf("deal.sealed,miner=%s", minerAddr.MinerActorAddr)).Update(int64(time.Since(t1)))
			}(i)
		}
		t.RecordMessage("waiting for all deals to be sealed")
		wg1.Wait()
		t.RecordMessage("all deals sealed; starting retrieval")

		var wg2 sync.WaitGroup
		for i := 0; i < deals; i++ {
			wg2.Add(1)
			go func(i int) {
				defer wg2.Done()
				t.RecordMessage("retrieving data for deal %d", i)
				t1 := time.Now()
				_ = testkit.RetrieveData(t, ctx, client, cids[i], nil, true, data[i])

				t.RecordMessage("retrieved data for deal %d", i)
				t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1)))
			}(i)
		}
		t.RecordMessage("waiting for all retrieval deals to complete")
		wg2.Wait()
		t.RecordMessage("all retrieval deals successful")

	} else {

		for i := 0; i < deals; i++ {
			deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i], false)
			t.RecordMessage("started storage deal %d -> %s", i, deal)
			time.Sleep(2 * time.Second)
			t.RecordMessage("waiting for deal %d to be sealed", i)
			testkit.WaitDealSealed(t, ctx, client, deal)
		}

		for i := 0; i < deals; i++ {
			t.RecordMessage("retrieving data for deal %d", i)
			_ = testkit.RetrieveData(t, ctx, client, cids[i], nil, true, data[i])
			t.RecordMessage("retrieved data for deal %d", i)
		}
	}

	t.SyncClient.MustSignalEntry(ctx, testkit.StateStopMining)
	t.SyncClient.MustSignalAndWait(ctx, testkit.StateDone, t.TestInstanceCount)

	time.Sleep(15 * time.Second) // wait for metrics to be emitted

	return nil
}