Merge pull request #57 from filecoin-project/basic-deal-flow
baseline test: basic deal flow
This commit is contained in:
commit
fa224271a1
@ -14,5 +14,5 @@ RUN paramfetch 2048 /proof-parameters.json
|
||||
|
||||
FROM ubuntu:18.04
|
||||
|
||||
RUN apt-get update && apt-get install -y ca-certificates llvm clang mesa-opencl-icd ocl-icd-opencl-dev jq gcc pkg-config
|
||||
COPY --from=downloader /var/tmp/filecoin-proof-parameters /var/tmp/filecoin-proof-parameters
|
||||
RUN apt-get update && apt-get install -y ca-certificates llvm clang mesa-opencl-icd ocl-icd-opencl-dev jq gcc pkg-config net-tools netcat traceroute iputils-ping wget vim curl telnet iproute2 dnsutils
|
||||
COPY --from=downloader /var/tmp/filecoin-proof-parameters /var/tmp/filecoin-proof-parameters
|
||||
|
@ -1,5 +1,28 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/ipfs/go-cid"
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
dag "github.com/ipfs/go-merkledag"
|
||||
dstest "github.com/ipfs/go-merkledag/test"
|
||||
unixfile "github.com/ipfs/go-unixfs/file"
|
||||
"github.com/ipld/go-car"
|
||||
)
|
||||
|
||||
// This is the basline test; Filecoin 101.
|
||||
//
|
||||
// A network with a bootstrapper, a number of miners, and a number of clients/full nodes
|
||||
@ -31,34 +54,244 @@ func runBaselineBootstrapper(t *TestEnvironment) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO just wait until completion of test, nothing else to do
|
||||
|
||||
ctx := context.Background()
|
||||
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runBaselineMiner(t *TestEnvironment) error {
|
||||
t.RecordMessage("running miner")
|
||||
_, err := prepareMiner(t)
|
||||
miner, err := prepareMiner(t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO wait a bit for network to bootstrap
|
||||
// TODO just wait until completion of test, serving requests -- the client does all the job
|
||||
ctx := context.Background()
|
||||
|
||||
clients := t.IntParam("clients")
|
||||
miners := t.IntParam("miners")
|
||||
|
||||
// mine / stop mining
|
||||
mine := true
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
for mine {
|
||||
|
||||
// synchronize all miners to mine the next block
|
||||
t.RecordMessage("synchronizing all miners to mine next block")
|
||||
t.SyncClient.MustSignalAndWait(ctx, stateMineNext, miners)
|
||||
|
||||
time.Sleep(time.Duration(100 + rand.Intn(int(100*time.Millisecond))))
|
||||
|
||||
err := miner.MineOne(ctx, func(bool) {
|
||||
// after a block is mined
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// wait for a signal from all clients to stop mining
|
||||
err = <-t.SyncClient.MustBarrier(ctx, stateStopMining, clients).C
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mine = false
|
||||
t.RecordMessage("shutting down mining")
|
||||
<-done
|
||||
|
||||
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runBaselineClient(t *TestEnvironment) error {
|
||||
t.RecordMessage("running client")
|
||||
_, err := prepareClient(t)
|
||||
cl, err := prepareClient(t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO generate a number of random "files" and publish them to one or more miners
|
||||
// TODO broadcast published content CIDs to other clients
|
||||
// TODO select a random piece of content published by some other client and retreieve it
|
||||
ctx := context.Background()
|
||||
addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.RecordMessage("got %v miner addrs", len(addrs))
|
||||
|
||||
client := cl.fullApi
|
||||
|
||||
// select a random miner
|
||||
minerAddr := addrs[rand.Intn(len(addrs))]
|
||||
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// generate random data
|
||||
data := make([]byte, 1600)
|
||||
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)
|
||||
|
||||
file, err := ioutil.TempFile("/tmp", "data")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.Remove(file.Name())
|
||||
|
||||
_, err = file.Write(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.RecordMessage("file cid: %s", fcid)
|
||||
|
||||
// start deal
|
||||
deal := startDeal(ctx, minerAddr.ActorAddr, client, fcid)
|
||||
t.RecordMessage("started deal: %s", deal)
|
||||
|
||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
t.RecordMessage("waiting for deal to be sealed")
|
||||
waitDealSealed(t, ctx, client, deal)
|
||||
|
||||
carExport := true
|
||||
|
||||
t.RecordMessage("trying to retrieve %s", fcid)
|
||||
retrieveData(t, ctx, err, client, fcid, carExport, data)
|
||||
|
||||
t.SyncClient.MustSignalEntry(ctx, stateStopMining)
|
||||
|
||||
// TODO broadcast published content CIDs to other clients
|
||||
// TODO select a random piece of content published by some other client and retrieve it
|
||||
|
||||
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
func startDeal(ctx context.Context, minerActorAddr address.Address, client api.FullNode, fcid cid.Cid) *cid.Cid {
|
||||
addr, err := client.WalletDefaultAddress(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
deal, err := client.ClientStartDeal(ctx, &api.StartDealParams{
|
||||
Data: &storagemarket.DataRef{Root: fcid},
|
||||
Wallet: addr,
|
||||
Miner: minerActorAddr,
|
||||
EpochPrice: types.NewInt(1000000),
|
||||
MinBlocksDuration: 1000,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return deal
|
||||
}
|
||||
|
||||
func waitDealSealed(t *TestEnvironment, ctx context.Context, client api.FullNode, deal *cid.Cid) {
|
||||
loop:
|
||||
for {
|
||||
di, err := client.ClientGetDealInfo(ctx, *deal)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
switch di.State {
|
||||
case storagemarket.StorageDealProposalRejected:
|
||||
panic("deal rejected")
|
||||
case storagemarket.StorageDealFailing:
|
||||
panic("deal failed")
|
||||
case storagemarket.StorageDealError:
|
||||
panic(fmt.Sprintf("deal errored %s", di.Message))
|
||||
case storagemarket.StorageDealActive:
|
||||
t.RecordMessage("completed deal: %s", di)
|
||||
break loop
|
||||
}
|
||||
t.RecordMessage("deal state: %s", storagemarket.DealStates[di.State])
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func retrieveData(t *TestEnvironment, ctx context.Context, err error, client api.FullNode, fcid cid.Cid, carExport bool, data []byte) {
|
||||
offers, err := client.ClientFindData(ctx, fcid)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(offers) < 1 {
|
||||
panic("no offers")
|
||||
}
|
||||
|
||||
rpath, err := ioutil.TempDir("", "lotus-retrieve-test-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer os.RemoveAll(rpath)
|
||||
|
||||
caddr, err := client.WalletDefaultAddress(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ref := &api.FileRef{
|
||||
Path: filepath.Join(rpath, "ret"),
|
||||
IsCAR: carExport,
|
||||
}
|
||||
err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if carExport {
|
||||
rdata = extractCarData(ctx, rdata, rpath)
|
||||
}
|
||||
|
||||
if !bytes.Equal(rdata, data) {
|
||||
panic("wrong data retrieved")
|
||||
}
|
||||
|
||||
t.RecordMessage("retrieved successfully")
|
||||
}
|
||||
|
||||
func extractCarData(ctx context.Context, rdata []byte, rpath string) []byte {
|
||||
bserv := dstest.Bserv()
|
||||
ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
b, err := bserv.GetBlock(ctx, ch.Roots[0])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
nd, err := ipld.Decode(b)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dserv := dag.NewDAGService(bserv)
|
||||
fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
outPath := filepath.Join(rpath, "retLoadedCAR")
|
||||
if err := files.WriteTo(fil, outPath); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rdata, err = ioutil.ReadFile(outPath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return rdata
|
||||
}
|
||||
|
64
lotus-soup/compositions/composition-k8s.toml
Normal file
64
lotus-soup/compositions/composition-k8s.toml
Normal file
@ -0,0 +1,64 @@
|
||||
[metadata]
|
||||
name = "lotus-soup"
|
||||
author = ""
|
||||
|
||||
[global]
|
||||
plan = "lotus-soup"
|
||||
case = "lotus-baseline"
|
||||
total_instances = 3
|
||||
builder = "docker:go"
|
||||
runner = "cluster:k8s"
|
||||
|
||||
[global.build_config]
|
||||
push_registry=true
|
||||
go_proxy_mode="remote"
|
||||
go_proxy_url="http://localhost:8081"
|
||||
registry_type="aws"
|
||||
|
||||
[[groups]]
|
||||
id = "bootstrapper"
|
||||
[groups.resources]
|
||||
memory = "512Mi"
|
||||
cpu = "1000m"
|
||||
[groups.instances]
|
||||
count = 1
|
||||
percentage = 0.0
|
||||
[groups.run]
|
||||
[groups.run.test_params]
|
||||
role = "bootstrapper"
|
||||
clients = "1"
|
||||
miners = "1"
|
||||
balance = "2000"
|
||||
sectors = "10"
|
||||
|
||||
[[groups]]
|
||||
id = "miners"
|
||||
[groups.resources]
|
||||
memory = "1024Mi"
|
||||
cpu = "1000m"
|
||||
[groups.instances]
|
||||
count = 1
|
||||
percentage = 0.0
|
||||
[groups.run]
|
||||
[groups.run.test_params]
|
||||
role = "miner"
|
||||
clients = "1"
|
||||
miners = "1"
|
||||
balance = "2000"
|
||||
sectors = "10"
|
||||
|
||||
[[groups]]
|
||||
id = "clients"
|
||||
[groups.resources]
|
||||
memory = "1024Mi"
|
||||
cpu = "1000m"
|
||||
[groups.instances]
|
||||
count = 1
|
||||
percentage = 0.0
|
||||
[groups.run]
|
||||
[groups.run.test_params]
|
||||
role = "client"
|
||||
clients = "1"
|
||||
miners = "1"
|
||||
balance = "2000"
|
||||
sectors = "10"
|
@ -22,7 +22,7 @@
|
||||
role = "bootstrapper"
|
||||
clients = "1"
|
||||
miners = "1"
|
||||
balance = "2000"
|
||||
balance = "2000000000"
|
||||
sectors = "10"
|
||||
|
||||
[[groups]]
|
||||
@ -38,7 +38,7 @@
|
||||
role = "miner"
|
||||
clients = "1"
|
||||
miners = "1"
|
||||
balance = "2000"
|
||||
balance = "2000000000"
|
||||
sectors = "10"
|
||||
|
||||
[[groups]]
|
||||
@ -54,5 +54,5 @@
|
||||
role = "client"
|
||||
clients = "1"
|
||||
miners = "1"
|
||||
balance = "2000"
|
||||
balance = "2000000000"
|
||||
sectors = "10"
|
||||
|
@ -3,12 +3,20 @@ module github.com/filecoin-project/oni/lotus-soup
|
||||
go 1.14
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef
|
||||
github.com/filecoin-project/go-fil-markets v0.3.0
|
||||
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
|
||||
github.com/filecoin-project/lotus v0.4.1-0.20200623104442-68d38eff33e4
|
||||
github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121
|
||||
github.com/ipfs/go-cid v0.0.6
|
||||
github.com/ipfs/go-datastore v0.4.4
|
||||
github.com/ipfs/go-ipfs-files v0.0.8
|
||||
github.com/ipfs/go-ipld-format v0.2.0
|
||||
github.com/ipfs/go-log/v2 v2.1.2-0.20200609205458-f8d20c392cb7
|
||||
github.com/ipfs/go-merkledag v0.3.1
|
||||
github.com/ipfs/go-unixfs v0.2.4
|
||||
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae
|
||||
github.com/libp2p/go-libp2p-core v0.6.0
|
||||
github.com/multiformats/go-multiaddr v0.2.2
|
||||
github.com/testground/sdk-go v0.2.3-0.20200617132925-2e4d69f9ba38
|
||||
|
@ -4,26 +4,15 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"os"
|
||||
|
||||
//"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
"github.com/testground/sdk-go/run"
|
||||
"github.com/testground/sdk-go/runtime"
|
||||
"github.com/testground/sdk-go/sync"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-storedcounter"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
@ -32,6 +21,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
|
||||
"github.com/filecoin-project/lotus/genesis"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
@ -39,16 +29,38 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/lp2p"
|
||||
modtest "github.com/filecoin-project/lotus/node/modules/testing"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
|
||||
"github.com/filecoin-project/specs-actors/actors/crypto"
|
||||
"github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/testground/sdk-go/run"
|
||||
"github.com/testground/sdk-go/runtime"
|
||||
"github.com/testground/sdk-go/sync"
|
||||
)
|
||||
|
||||
func init() {
|
||||
logging.SetLogLevel("*", "WARN")
|
||||
|
||||
os.Setenv("BELLMAN_NO_GPU", "1")
|
||||
|
||||
build.InsecurePoStValidation = true
|
||||
build.DisableBuiltinAssets = true
|
||||
|
||||
power.ConsensusMinerMinPower = big.NewInt(2048)
|
||||
saminer.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{
|
||||
abi.RegisteredSealProof_StackedDrg2KiBV1: {},
|
||||
}
|
||||
verifreg.MinVerifiedDealSize = big.NewInt(256)
|
||||
}
|
||||
|
||||
var (
|
||||
PrepareNodeTimeout = time.Minute
|
||||
|
||||
@ -56,7 +68,13 @@ var (
|
||||
balanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{})
|
||||
presealTopic = sync.NewTopic("preseal", &PresealMsg{})
|
||||
|
||||
stateReady = sync.State("ready")
|
||||
clientsAddrsTopic = sync.NewTopic("clientsAddrsTopic", &peer.AddrInfo{})
|
||||
minersAddrsTopic = sync.NewTopic("minersAddrsTopic", &MinerAddresses{})
|
||||
|
||||
stateReady = sync.State("ready")
|
||||
stateDone = sync.State("done")
|
||||
stateMineNext = sync.State("mine-next")
|
||||
stateStopMining = sync.State("stop-mining")
|
||||
)
|
||||
|
||||
type TestEnvironment struct {
|
||||
@ -68,6 +86,7 @@ type Node struct {
|
||||
fullApi api.FullNode
|
||||
minerApi api.StorageMiner
|
||||
stop node.StopFunc
|
||||
MineOne func(context.Context, func(bool)) error
|
||||
}
|
||||
|
||||
type InitialBalanceMsg struct {
|
||||
@ -84,17 +103,9 @@ type GenesisMsg struct {
|
||||
Bootstrapper []byte
|
||||
}
|
||||
|
||||
func init() {
|
||||
logging.SetLogLevel("vm", "WARN")
|
||||
|
||||
build.DisableBuiltinAssets = true
|
||||
|
||||
// Note: I don't understand the significance of this, but the node test does it.
|
||||
power.ConsensusMinerMinPower = big.NewInt(2048)
|
||||
saminer.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{
|
||||
abi.RegisteredSealProof_StackedDrg2KiBV1: {},
|
||||
}
|
||||
verifreg.MinVerifiedDealSize = big.NewInt(256)
|
||||
type MinerAddresses struct {
|
||||
PeerAddr peer.AddrInfo
|
||||
ActorAddr address.Address
|
||||
}
|
||||
|
||||
func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
|
||||
@ -138,7 +149,7 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
|
||||
genesisTemplate := genesis.Template{
|
||||
Accounts: genesisActors,
|
||||
Miners: genesisMiners,
|
||||
Timestamp: uint64(time.Now().Unix() - 1000), // this needs to be in the past
|
||||
Timestamp: uint64(time.Now().Unix() - 100000), // this needs to be in the past
|
||||
}
|
||||
|
||||
// dump the genesis block
|
||||
@ -172,7 +183,6 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
|
||||
}
|
||||
n.stop = stop
|
||||
|
||||
// this dance to construct the bootstrapper multiaddr is quite vexing.
|
||||
var bootstrapperAddr ma.Multiaddr
|
||||
|
||||
bootstrapperAddrs, err := n.fullApi.NetAddrsListen(ctx)
|
||||
@ -272,7 +282,6 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
|
||||
// prepare the repo
|
||||
minerRepo := repo.NewMemory(nil)
|
||||
|
||||
// V00D00 People DaNC3!
|
||||
lr, err := minerRepo.Lock(repo.StorageMiner)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -345,11 +354,13 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mineBlock := make(chan func(bool))
|
||||
stop2, err := node.New(context.Background(),
|
||||
node.StorageMiner(&n.minerApi),
|
||||
node.Online(),
|
||||
node.Repo(minerRepo),
|
||||
node.Override(new(api.FullNode), n.fullApi),
|
||||
node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock, minerAddr)),
|
||||
withMinerListenAddress(minerIP),
|
||||
)
|
||||
if err != nil {
|
||||
@ -366,6 +377,25 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
|
||||
return err1
|
||||
}
|
||||
|
||||
remoteAddrs, err := n.fullApi.NetAddrsListen(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = n.minerApi.NetConnect(ctx, remoteAddrs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
n.MineOne = func(ctx context.Context, cb func(bool)) error {
|
||||
select {
|
||||
case mineBlock <- cb:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// add local storage for presealed sectors
|
||||
err = n.minerApi.StorageAddLocal(ctx, presealDir)
|
||||
if err != nil {
|
||||
@ -395,6 +425,17 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.RecordMessage("publish our address to the miners addr topic")
|
||||
actoraddress, err := n.minerApi.ActorAddress(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
addrinfo, err := n.minerApi.NetAddrsListen(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.SyncClient.MustPublish(ctx, minersAddrsTopic, MinerAddresses{addrinfo, actoraddress})
|
||||
|
||||
t.RecordMessage("waiting for all nodes to be ready")
|
||||
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
|
||||
|
||||
@ -447,6 +488,13 @@ func prepareClient(t *TestEnvironment) (*Node, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.RecordMessage("publish our address to the clients addr topic")
|
||||
addrinfo, err := n.fullApi.NetAddrsListen(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.SyncClient.MustPublish(ctx, clientsAddrsTopic, addrinfo)
|
||||
|
||||
t.RecordMessage("waiting for all nodes to be ready")
|
||||
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
|
||||
|
||||
@ -554,3 +602,37 @@ func waitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error
|
||||
return nil, fmt.Errorf("error while waiting for genesis msg: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
func collectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddresses, error) {
|
||||
ch := make(chan MinerAddresses)
|
||||
sub := t.SyncClient.MustSubscribe(ctx, minersAddrsTopic, ch)
|
||||
|
||||
addrs := make([]MinerAddresses, 0, miners)
|
||||
for i := 0; i < miners; i++ {
|
||||
select {
|
||||
case a := <-ch:
|
||||
addrs = append(addrs, a)
|
||||
case err := <-sub.Done():
|
||||
return nil, fmt.Errorf("got error while waiting for miners addrs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return addrs, nil
|
||||
}
|
||||
|
||||
func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]peer.AddrInfo, error) {
|
||||
ch := make(chan peer.AddrInfo)
|
||||
sub := t.SyncClient.MustSubscribe(ctx, clientsAddrsTopic, ch)
|
||||
|
||||
addrs := make([]peer.AddrInfo, 0, clients)
|
||||
for i := 0; i < clients; i++ {
|
||||
select {
|
||||
case a := <-ch:
|
||||
addrs = append(addrs, a)
|
||||
case err := <-sub.Done():
|
||||
return nil, fmt.Errorf("got error while waiting for clients addrs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return addrs, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user