refactor lotus recipes to make them more manageable. (#86)
- Each role now has its own file (role_*.go). - Options have their own file (lotus_opts.go). - Sync service constructions have their own file (sync.go). - Utilities are functionally grouped in files ({deals,retrieval}.go).
This commit is contained in:
parent
035bf8b7db
commit
7f3716504b
@ -1,253 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"math/rand"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
files "github.com/ipfs/go-ipfs-files"
|
|
||||||
ipld "github.com/ipfs/go-ipld-format"
|
|
||||||
dag "github.com/ipfs/go-merkledag"
|
|
||||||
dstest "github.com/ipfs/go-merkledag/test"
|
|
||||||
unixfile "github.com/ipfs/go-unixfs/file"
|
|
||||||
"github.com/ipld/go-car"
|
|
||||||
)
|
|
||||||
|
|
||||||
// This is the basline test; Filecoin 101.
|
|
||||||
//
|
|
||||||
// A network with a bootstrapper, a number of miners, and a number of clients/full nodes
|
|
||||||
// is constructed and connected through the bootstrapper.
|
|
||||||
// Some funds are allocated to each node and a number of sectors are presealed in the genesis block.
|
|
||||||
//
|
|
||||||
// The test plan:
|
|
||||||
// One or more clients store content to one or more miners, testing storage deals.
|
|
||||||
// The plan ensures that the storage deals hit the blockchain and measure the time it took.
|
|
||||||
// Verification: one or more clients retrieve and verify the hashes of stored content.
|
|
||||||
// The plan ensures that all (previously) published content can be correctly retrieved
|
|
||||||
// and measures the time it took.
|
|
||||||
//
|
|
||||||
// Preparation of the genesis block: this is the responsibility of the bootstrapper.
|
|
||||||
// In order to compute the genesis block, we need to collect identities and presealed
|
|
||||||
// sectors from each node.
|
|
||||||
// The we create a genesis block that allocates some funds to each node and collects
|
|
||||||
// the presealed sectors.
|
|
||||||
var baselineRoles = map[string]func(*TestEnvironment) error{
|
|
||||||
"bootstrapper": runBootstrapper,
|
|
||||||
"miner": runMiner,
|
|
||||||
"client": runBaselineClient,
|
|
||||||
"drand": runDrandNode,
|
|
||||||
"pubsub-tracer": runPubsubTracer,
|
|
||||||
}
|
|
||||||
|
|
||||||
func runBaselineClient(t *TestEnvironment) error {
|
|
||||||
t.RecordMessage("running client")
|
|
||||||
cl, err := prepareClient(t)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners"))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
t.RecordMessage("got %v miner addrs", len(addrs))
|
|
||||||
|
|
||||||
client := cl.fullApi
|
|
||||||
|
|
||||||
// select a random miner
|
|
||||||
minerAddr := addrs[rand.Intn(len(addrs))]
|
|
||||||
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1)
|
|
||||||
|
|
||||||
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
|
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
|
|
||||||
// generate random data
|
|
||||||
data := make([]byte, 1600)
|
|
||||||
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)
|
|
||||||
|
|
||||||
file, err := ioutil.TempFile("/tmp", "data")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer os.Remove(file.Name())
|
|
||||||
|
|
||||||
_, err = file.Write(data)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
t.RecordMessage("file cid: %s", fcid)
|
|
||||||
|
|
||||||
// start deal
|
|
||||||
t1 := time.Now()
|
|
||||||
deal := startDeal(ctx, minerAddr.ActorAddr, client, fcid)
|
|
||||||
t.RecordMessage("started deal: %s", deal)
|
|
||||||
|
|
||||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
|
|
||||||
t.RecordMessage("waiting for deal to be sealed")
|
|
||||||
waitDealSealed(t, ctx, client, deal)
|
|
||||||
t.D().ResettingHistogram("deal.sealed").Update(int64(time.Since(t1)))
|
|
||||||
|
|
||||||
carExport := true
|
|
||||||
|
|
||||||
t.RecordMessage("trying to retrieve %s", fcid)
|
|
||||||
retrieveData(t, ctx, err, client, fcid, carExport, data)
|
|
||||||
t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1)))
|
|
||||||
|
|
||||||
t.SyncClient.MustSignalEntry(ctx, stateStopMining)
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Second) // wait for metrics to be emitted
|
|
||||||
|
|
||||||
// TODO broadcast published content CIDs to other clients
|
|
||||||
// TODO select a random piece of content published by some other client and retrieve it
|
|
||||||
|
|
||||||
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func startDeal(ctx context.Context, minerActorAddr address.Address, client api.FullNode, fcid cid.Cid) *cid.Cid {
|
|
||||||
addr, err := client.WalletDefaultAddress(ctx)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
deal, err := client.ClientStartDeal(ctx, &api.StartDealParams{
|
|
||||||
Data: &storagemarket.DataRef{Root: fcid},
|
|
||||||
Wallet: addr,
|
|
||||||
Miner: minerActorAddr,
|
|
||||||
EpochPrice: types.NewInt(1000000),
|
|
||||||
MinBlocksDuration: 1000,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return deal
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitDealSealed(t *TestEnvironment, ctx context.Context, client api.FullNode, deal *cid.Cid) {
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
di, err := client.ClientGetDealInfo(ctx, *deal)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
switch di.State {
|
|
||||||
case storagemarket.StorageDealProposalRejected:
|
|
||||||
panic("deal rejected")
|
|
||||||
case storagemarket.StorageDealFailing:
|
|
||||||
panic("deal failed")
|
|
||||||
case storagemarket.StorageDealError:
|
|
||||||
panic(fmt.Sprintf("deal errored %s", di.Message))
|
|
||||||
case storagemarket.StorageDealActive:
|
|
||||||
t.RecordMessage("completed deal: %s", di)
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
t.RecordMessage("deal state: %s", storagemarket.DealStates[di.State])
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func retrieveData(t *TestEnvironment, ctx context.Context, err error, client api.FullNode, fcid cid.Cid, carExport bool, data []byte) {
|
|
||||||
t1 := time.Now()
|
|
||||||
offers, err := client.ClientFindData(ctx, fcid)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
for _, o := range offers {
|
|
||||||
t.D().Counter(fmt.Sprintf("find-data.offer,miner=%s", o.Miner)).Inc(1)
|
|
||||||
}
|
|
||||||
t.D().ResettingHistogram("find-data").Update(int64(time.Since(t1)))
|
|
||||||
|
|
||||||
if len(offers) < 1 {
|
|
||||||
panic("no offers")
|
|
||||||
}
|
|
||||||
|
|
||||||
rpath, err := ioutil.TempDir("", "lotus-retrieve-test-")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(rpath)
|
|
||||||
|
|
||||||
caddr, err := client.WalletDefaultAddress(ctx)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ref := &api.FileRef{
|
|
||||||
Path: filepath.Join(rpath, "ret"),
|
|
||||||
IsCAR: carExport,
|
|
||||||
}
|
|
||||||
t1 = time.Now()
|
|
||||||
err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
t.D().ResettingHistogram("retrieve-data").Update(int64(time.Since(t1)))
|
|
||||||
|
|
||||||
rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret"))
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if carExport {
|
|
||||||
rdata = extractCarData(ctx, rdata, rpath)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !bytes.Equal(rdata, data) {
|
|
||||||
panic("wrong data retrieved")
|
|
||||||
}
|
|
||||||
|
|
||||||
t.RecordMessage("retrieved successfully")
|
|
||||||
}
|
|
||||||
|
|
||||||
func extractCarData(ctx context.Context, rdata []byte, rpath string) []byte {
|
|
||||||
bserv := dstest.Bserv()
|
|
||||||
ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata))
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
b, err := bserv.GetBlock(ctx, ch.Roots[0])
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
nd, err := ipld.Decode(b)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
dserv := dag.NewDAGService(bserv)
|
|
||||||
fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
outPath := filepath.Join(rpath, "retLoadedCAR")
|
|
||||||
if err := files.WriteTo(fil, outPath); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
rdata, err = ioutil.ReadFile(outPath)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return rdata
|
|
||||||
}
|
|
@ -1,106 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
|
||||||
"github.com/testground/sdk-go/sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
func runBootstrapper(t *TestEnvironment) error {
|
|
||||||
t.RecordMessage("running bootstrapper")
|
|
||||||
_, err := prepareBootstrapper(t)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func runMiner(t *TestEnvironment) error {
|
|
||||||
t.RecordMessage("running miner")
|
|
||||||
miner, err := prepareMiner(t)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
t.RecordMessage("block delay: %v", build.BlockDelay)
|
|
||||||
t.D().Gauge("miner.block-delay").Update(build.BlockDelay)
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
clients := t.IntParam("clients")
|
|
||||||
miners := t.IntParam("miners")
|
|
||||||
|
|
||||||
myActorAddr, err := miner.minerApi.ActorAddress(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// mine / stop mining
|
|
||||||
mine := true
|
|
||||||
done := make(chan struct{})
|
|
||||||
|
|
||||||
if miner.MineOne != nil {
|
|
||||||
go func() {
|
|
||||||
defer t.RecordMessage("shutting down mining")
|
|
||||||
defer close(done)
|
|
||||||
|
|
||||||
var i int
|
|
||||||
for i = 0; mine; i++ {
|
|
||||||
// synchronize all miners to mine the next block
|
|
||||||
t.RecordMessage("synchronizing all miners to mine next block [%d]", i)
|
|
||||||
stateMineNext := sync.State(fmt.Sprintf("mine-block-%d", i))
|
|
||||||
t.SyncClient.MustSignalAndWait(ctx, stateMineNext, miners)
|
|
||||||
|
|
||||||
ch := make(chan struct{})
|
|
||||||
err := miner.MineOne(ctx, func(mined bool) {
|
|
||||||
if mined {
|
|
||||||
t.D().Counter(fmt.Sprintf("block.mine,miner=%s", myActorAddr)).Inc(1)
|
|
||||||
}
|
|
||||||
close(ch)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
<-ch
|
|
||||||
}
|
|
||||||
|
|
||||||
// signal the last block to make sure no miners are left stuck waiting for the next block signal
|
|
||||||
// while the others have stopped
|
|
||||||
stateMineLast := sync.State(fmt.Sprintf("mine-block-%d", i))
|
|
||||||
t.SyncClient.MustSignalEntry(ctx, stateMineLast)
|
|
||||||
}()
|
|
||||||
} else {
|
|
||||||
close(done)
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for a signal from all clients to stop mining
|
|
||||||
err = <-t.SyncClient.MustBarrier(ctx, stateStopMining, clients).C
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
mine = false
|
|
||||||
<-done
|
|
||||||
|
|
||||||
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func runDrandNode(t *TestEnvironment) error {
|
|
||||||
t.RecordMessage("running drand node")
|
|
||||||
dr, err := prepareDrandNode(t)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer dr.Cleanup()
|
|
||||||
|
|
||||||
// TODO add ability to halt / recover on demand
|
|
||||||
ctx := context.Background()
|
|
||||||
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
|
||||||
return nil
|
|
||||||
}
|
|
55
lotus-soup/deals.go
Normal file
55
lotus-soup/deals.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
)
|
||||||
|
|
||||||
|
func startDeal(ctx context.Context, minerActorAddr address.Address, client api.FullNode, fcid cid.Cid) *cid.Cid {
|
||||||
|
addr, err := client.WalletDefaultAddress(ctx)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
deal, err := client.ClientStartDeal(ctx, &api.StartDealParams{
|
||||||
|
Data: &storagemarket.DataRef{Root: fcid},
|
||||||
|
Wallet: addr,
|
||||||
|
Miner: minerActorAddr,
|
||||||
|
EpochPrice: types.NewInt(1000000),
|
||||||
|
MinBlocksDuration: 1000,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return deal
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitDealSealed(t *TestEnvironment, ctx context.Context, client api.FullNode, deal *cid.Cid) {
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
di, err := client.ClientGetDealInfo(ctx, *deal)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
switch di.State {
|
||||||
|
case storagemarket.StorageDealProposalRejected:
|
||||||
|
panic("deal rejected")
|
||||||
|
case storagemarket.StorageDealFailing:
|
||||||
|
panic("deal failed")
|
||||||
|
case storagemarket.StorageDealError:
|
||||||
|
panic(fmt.Sprintf("deal errored %s", di.Message))
|
||||||
|
case storagemarket.StorageDealActive:
|
||||||
|
t.RecordMessage("completed deal: %s", di)
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
t.RecordMessage("deal state: %s", storagemarket.DealStates[di.State])
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
67
lotus-soup/lotus_opts.go
Normal file
67
lotus-soup/lotus_opts.go
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/node"
|
||||||
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/lp2p"
|
||||||
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
)
|
||||||
|
|
||||||
|
func withGenesis(gb []byte) node.Option {
|
||||||
|
return node.Override(new(modules.Genesis), modules.LoadGenesis(gb))
|
||||||
|
}
|
||||||
|
|
||||||
|
func withBootstrapper(ab []byte) node.Option {
|
||||||
|
return node.Override(new(dtypes.BootstrapPeers),
|
||||||
|
func() (dtypes.BootstrapPeers, error) {
|
||||||
|
if ab == nil {
|
||||||
|
return dtypes.BootstrapPeers{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
a, err := ma.NewMultiaddrBytes(ab)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ai, err := peer.AddrInfoFromP2pAddr(a)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return dtypes.BootstrapPeers{*ai}, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func withPubsubConfig(bootstrapper bool, pubsubTracer string) node.Option {
|
||||||
|
return node.Override(new(*config.Pubsub), func() *config.Pubsub {
|
||||||
|
return &config.Pubsub{
|
||||||
|
Bootstrapper: bootstrapper,
|
||||||
|
RemoteTracer: pubsubTracer,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func withListenAddress(ip string) node.Option {
|
||||||
|
addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4001", ip)}
|
||||||
|
return node.Override(node.StartListeningKey, lp2p.StartListening(addrs))
|
||||||
|
}
|
||||||
|
|
||||||
|
func withMinerListenAddress(ip string) node.Option {
|
||||||
|
addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4002", ip)}
|
||||||
|
return node.Override(node.StartListeningKey, lp2p.StartListening(addrs))
|
||||||
|
}
|
||||||
|
|
||||||
|
func withApiEndpoint(addr string) node.Option {
|
||||||
|
return node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error {
|
||||||
|
apima, err := ma.NewMultiaddr(addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return lr.SetAPIEndpoint(apima)
|
||||||
|
})
|
||||||
|
}
|
@ -8,7 +8,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var testplans = map[string]interface{}{
|
var testplans = map[string]interface{}{
|
||||||
"lotus-baseline": doRun(baselineRoles),
|
"lotus-baseline": doRun(basicRoles),
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -1,62 +1,35 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/go-jsonrpc"
|
|
||||||
"github.com/filecoin-project/go-jsonrpc/auth"
|
|
||||||
"github.com/filecoin-project/go-storedcounter"
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/api/apistruct"
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
|
||||||
"github.com/filecoin-project/lotus/chain/beacon"
|
"github.com/filecoin-project/lotus/chain/beacon"
|
||||||
genesis_chain "github.com/filecoin-project/lotus/chain/gen/genesis"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
"github.com/filecoin-project/lotus/chain/wallet"
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
|
|
||||||
"github.com/filecoin-project/lotus/genesis"
|
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
"github.com/filecoin-project/lotus/miner"
|
|
||||||
"github.com/filecoin-project/lotus/node"
|
"github.com/filecoin-project/lotus/node"
|
||||||
"github.com/filecoin-project/lotus/node/config"
|
|
||||||
"github.com/filecoin-project/lotus/node/impl"
|
|
||||||
"github.com/filecoin-project/lotus/node/modules"
|
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/modules/lp2p"
|
|
||||||
modtest "github.com/filecoin-project/lotus/node/modules/testing"
|
modtest "github.com/filecoin-project/lotus/node/modules/testing"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
|
||||||
saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
|
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
|
||||||
"github.com/filecoin-project/specs-actors/actors/crypto"
|
|
||||||
"github.com/gorilla/mux"
|
|
||||||
"github.com/ipfs/go-datastore"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
influxdb "github.com/kpacha/opencensus-influxdb"
|
influxdb "github.com/kpacha/opencensus-influxdb"
|
||||||
|
|
||||||
libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto"
|
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"github.com/multiformats/go-multiaddr"
|
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
|
||||||
manet "github.com/multiformats/go-multiaddr-net"
|
manet "github.com/multiformats/go-multiaddr-net"
|
||||||
"github.com/testground/sdk-go/run"
|
"github.com/testground/sdk-go/run"
|
||||||
"github.com/testground/sdk-go/runtime"
|
"github.com/testground/sdk-go/runtime"
|
||||||
"github.com/testground/sdk-go/sync"
|
|
||||||
|
|
||||||
"go.opencensus.io/stats"
|
"go.opencensus.io/stats"
|
||||||
"go.opencensus.io/stats/view"
|
"go.opencensus.io/stats/view"
|
||||||
)
|
)
|
||||||
@ -76,21 +49,7 @@ func init() {
|
|||||||
verifreg.MinVerifiedDealSize = big.NewInt(256)
|
verifreg.MinVerifiedDealSize = big.NewInt(256)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var PrepareNodeTimeout = time.Minute
|
||||||
PrepareNodeTimeout = time.Minute
|
|
||||||
|
|
||||||
genesisTopic = sync.NewTopic("genesis", &GenesisMsg{})
|
|
||||||
balanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{})
|
|
||||||
presealTopic = sync.NewTopic("preseal", &PresealMsg{})
|
|
||||||
|
|
||||||
clientsAddrsTopic = sync.NewTopic("clientsAddrsTopic", &peer.AddrInfo{})
|
|
||||||
minersAddrsTopic = sync.NewTopic("minersAddrsTopic", &MinerAddresses{})
|
|
||||||
|
|
||||||
stateReady = sync.State("ready")
|
|
||||||
stateDone = sync.State("done")
|
|
||||||
stateStopMining = sync.State("stop-mining")
|
|
||||||
stateMinerPickSeqNum = sync.State("miner-pick-seq-num")
|
|
||||||
)
|
|
||||||
|
|
||||||
type TestEnvironment struct {
|
type TestEnvironment struct {
|
||||||
*runtime.RunEnv
|
*runtime.RunEnv
|
||||||
@ -117,482 +76,6 @@ type Node struct {
|
|||||||
MineOne func(context.Context, func(bool)) error
|
MineOne func(context.Context, func(bool)) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type InitialBalanceMsg struct {
|
|
||||||
Addr address.Address
|
|
||||||
Balance int
|
|
||||||
}
|
|
||||||
|
|
||||||
type PresealMsg struct {
|
|
||||||
Miner genesis.Miner
|
|
||||||
Seqno int64
|
|
||||||
}
|
|
||||||
|
|
||||||
type GenesisMsg struct {
|
|
||||||
Genesis []byte
|
|
||||||
Bootstrapper []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type MinerAddresses struct {
|
|
||||||
PeerAddr peer.AddrInfo
|
|
||||||
ActorAddr address.Address
|
|
||||||
}
|
|
||||||
|
|
||||||
func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
clients := t.IntParam("clients")
|
|
||||||
miners := t.IntParam("miners")
|
|
||||||
nodes := clients + miners
|
|
||||||
|
|
||||||
drandOpt, err := getDrandConfig(ctx, t)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// the first duty of the boostrapper is to construct the genesis block
|
|
||||||
// first collect all client and miner balances to assign initial funds
|
|
||||||
balances, err := waitForBalances(t, ctx, nodes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// then collect all preseals from miners
|
|
||||||
preseals, err := collectPreseals(t, ctx, miners)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// now construct the genesis block
|
|
||||||
var genesisActors []genesis.Actor
|
|
||||||
var genesisMiners []genesis.Miner
|
|
||||||
|
|
||||||
for _, bm := range balances {
|
|
||||||
genesisActors = append(genesisActors,
|
|
||||||
genesis.Actor{
|
|
||||||
Type: genesis.TAccount,
|
|
||||||
Balance: big.Mul(big.NewInt(int64(bm.Balance)), types.NewInt(build.FilecoinPrecision)),
|
|
||||||
Meta: (&genesis.AccountMeta{Owner: bm.Addr}).ActorMeta(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, pm := range preseals {
|
|
||||||
genesisMiners = append(genesisMiners, pm.Miner)
|
|
||||||
}
|
|
||||||
|
|
||||||
genesisTemplate := genesis.Template{
|
|
||||||
Accounts: genesisActors,
|
|
||||||
Miners: genesisMiners,
|
|
||||||
Timestamp: uint64(time.Now().Unix()) - uint64(t.IntParam("genesis_timestamp_offset")), // this needs to be in the past
|
|
||||||
}
|
|
||||||
|
|
||||||
// dump the genesis block
|
|
||||||
// var jsonBuf bytes.Buffer
|
|
||||||
// jsonEnc := json.NewEncoder(&jsonBuf)
|
|
||||||
// err := jsonEnc.Encode(genesisTemplate)
|
|
||||||
// if err != nil {
|
|
||||||
// panic(err)
|
|
||||||
// }
|
|
||||||
// runenv.RecordMessage(fmt.Sprintf("Genesis template: %s", string(jsonBuf.Bytes())))
|
|
||||||
|
|
||||||
// this is horrendously disgusting, we use this contraption to side effect the construction
|
|
||||||
// of the genesis block in the buffer -- yes, a side effect of dependency injection.
|
|
||||||
// I remember when software was straightforward...
|
|
||||||
var genesisBuffer bytes.Buffer
|
|
||||||
|
|
||||||
bootstrapperIP := t.NetClient.MustGetDataNetworkIP().String()
|
|
||||||
|
|
||||||
n := &Node{}
|
|
||||||
stop, err := node.New(context.Background(),
|
|
||||||
node.FullAPI(&n.fullApi),
|
|
||||||
node.Online(),
|
|
||||||
node.Repo(repo.NewMemory(nil)),
|
|
||||||
node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)),
|
|
||||||
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
|
|
||||||
withListenAddress(bootstrapperIP),
|
|
||||||
withBootstrapper(nil),
|
|
||||||
withPubsubConfig(true, pubsubTracer),
|
|
||||||
drandOpt,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
n.stop = stop
|
|
||||||
|
|
||||||
var bootstrapperAddr ma.Multiaddr
|
|
||||||
|
|
||||||
bootstrapperAddrs, err := n.fullApi.NetAddrsListen(ctx)
|
|
||||||
if err != nil {
|
|
||||||
stop(context.TODO())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
for _, a := range bootstrapperAddrs.Addrs {
|
|
||||||
ip, err := a.ValueForProtocol(ma.P_IP4)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if ip != bootstrapperIP {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
addrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{
|
|
||||||
ID: bootstrapperAddrs.ID,
|
|
||||||
Addrs: []ma.Multiaddr{a},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
bootstrapperAddr = addrs[0]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if bootstrapperAddr == nil {
|
|
||||||
panic("failed to determine bootstrapper address")
|
|
||||||
}
|
|
||||||
|
|
||||||
genesisMsg := &GenesisMsg{
|
|
||||||
Genesis: genesisBuffer.Bytes(),
|
|
||||||
Bootstrapper: bootstrapperAddr.Bytes(),
|
|
||||||
}
|
|
||||||
t.SyncClient.MustPublish(ctx, genesisTopic, genesisMsg)
|
|
||||||
|
|
||||||
t.RecordMessage("waiting for all nodes to be ready")
|
|
||||||
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
|
|
||||||
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func prepareMiner(t *TestEnvironment) (*Node, error) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
drandOpt, err := getDrandConfig(ctx, t)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// first create a wallet
|
|
||||||
walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// publish the account ID/balance
|
|
||||||
balance := t.IntParam("balance")
|
|
||||||
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
|
|
||||||
t.SyncClient.Publish(ctx, balanceTopic, balanceMsg)
|
|
||||||
|
|
||||||
// create and publish the preseal commitment
|
|
||||||
priv, _, err := libp2p_crypto.GenerateEd25519Key(rand.Reader)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
minerID, err := peer.IDFromPrivateKey(priv)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// pick unique sequence number for each miner, no matter in which group they are
|
|
||||||
seq := t.SyncClient.MustSignalAndWait(ctx, stateMinerPickSeqNum, t.IntParam("miners"))
|
|
||||||
|
|
||||||
minerAddr, err := address.NewIDAddress(genesis_chain.MinerStart + uint64(seq-1))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
presealDir, err := ioutil.TempDir("", "preseal")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
sectors := t.IntParam("sectors")
|
|
||||||
genMiner, _, err := seed.PreSeal(minerAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, 0, sectors, presealDir, []byte("TODO: randomize this"), &walletKey.KeyInfo)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
genMiner.PeerId = minerID
|
|
||||||
|
|
||||||
t.RecordMessage("Miner Info: Owner: %s Worker: %s", genMiner.Owner, genMiner.Worker)
|
|
||||||
|
|
||||||
presealMsg := &PresealMsg{Miner: *genMiner, Seqno: seq}
|
|
||||||
t.SyncClient.Publish(ctx, presealTopic, presealMsg)
|
|
||||||
|
|
||||||
// then collect the genesis block and bootstrapper address
|
|
||||||
genesisMsg, err := waitForGenesis(t, ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// prepare the repo
|
|
||||||
minerRepo := repo.NewMemory(nil)
|
|
||||||
|
|
||||||
lr, err := minerRepo.Lock(repo.StorageMiner)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ks, err := lr.KeyStore()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
kbytes, err := priv.Bytes()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ks.Put("libp2p-host", types.KeyInfo{
|
|
||||||
Type: "libp2p-host",
|
|
||||||
PrivateKey: kbytes,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ds, err := lr.Datastore("/metadata")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ds.Put(datastore.NewKey("miner-address"), minerAddr.Bytes())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
nic := storedcounter.New(ds, datastore.NewKey(modules.StorageCounterDSPrefix))
|
|
||||||
for i := 0; i < (sectors + 1); i++ {
|
|
||||||
_, err = nic.Next()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = lr.Close()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
minerIP := t.NetClient.MustGetDataNetworkIP().String()
|
|
||||||
|
|
||||||
// create the node
|
|
||||||
// we need both a full node _and_ and storage miner node
|
|
||||||
n := &Node{}
|
|
||||||
|
|
||||||
nodeRepo := repo.NewMemory(nil)
|
|
||||||
|
|
||||||
stop1, err := node.New(context.Background(),
|
|
||||||
node.FullAPI(&n.fullApi),
|
|
||||||
node.Online(),
|
|
||||||
node.Repo(nodeRepo),
|
|
||||||
withGenesis(genesisMsg.Genesis),
|
|
||||||
withListenAddress(minerIP),
|
|
||||||
withBootstrapper(genesisMsg.Bootstrapper),
|
|
||||||
withPubsubConfig(false, pubsubTracer),
|
|
||||||
drandOpt,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the wallet
|
|
||||||
err = n.setWallet(ctx, walletKey)
|
|
||||||
if err != nil {
|
|
||||||
stop1(context.TODO())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
minerOpts := []node.Option{
|
|
||||||
node.StorageMiner(&n.minerApi),
|
|
||||||
node.Online(),
|
|
||||||
node.Repo(minerRepo),
|
|
||||||
node.Override(new(api.FullNode), n.fullApi),
|
|
||||||
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
|
|
||||||
withMinerListenAddress(minerIP),
|
|
||||||
}
|
|
||||||
|
|
||||||
if t.StringParam("mining_mode") != "natural" {
|
|
||||||
mineBlock := make(chan func(bool))
|
|
||||||
minerOpts = append(minerOpts,
|
|
||||||
node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock, minerAddr)))
|
|
||||||
n.MineOne = func(ctx context.Context, cb func(bool)) error {
|
|
||||||
select {
|
|
||||||
case mineBlock <- cb:
|
|
||||||
return nil
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stop2, err := node.New(context.Background(), minerOpts...)
|
|
||||||
if err != nil {
|
|
||||||
stop1(context.TODO())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
n.stop = func(ctx context.Context) error {
|
|
||||||
// TODO use a multierror for this
|
|
||||||
err2 := stop2(ctx)
|
|
||||||
err1 := stop1(ctx)
|
|
||||||
if err2 != nil {
|
|
||||||
return err2
|
|
||||||
}
|
|
||||||
return err1
|
|
||||||
}
|
|
||||||
|
|
||||||
registerAndExportMetrics(minerAddr.String())
|
|
||||||
|
|
||||||
// Bootstrap with full node
|
|
||||||
remoteAddrs, err := n.fullApi.NetAddrsListen(ctx)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = n.minerApi.NetConnect(ctx, remoteAddrs)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = startStorMinerAPIServer(minerRepo, n.minerApi)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// add local storage for presealed sectors
|
|
||||||
err = n.minerApi.StorageAddLocal(ctx, presealDir)
|
|
||||||
if err != nil {
|
|
||||||
n.stop(context.TODO())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the miner PeerID
|
|
||||||
minerIDEncoded, err := actors.SerializeParams(&saminer.ChangePeerIDParams{NewID: abi.PeerID(minerID)})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
changeMinerID := &types.Message{
|
|
||||||
To: minerAddr,
|
|
||||||
From: genMiner.Worker,
|
|
||||||
Method: builtin.MethodsMiner.ChangePeerID,
|
|
||||||
Params: minerIDEncoded,
|
|
||||||
Value: types.NewInt(0),
|
|
||||||
GasPrice: types.NewInt(0),
|
|
||||||
GasLimit: 1000000,
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = n.fullApi.MpoolPushMessage(ctx, changeMinerID)
|
|
||||||
if err != nil {
|
|
||||||
n.stop(context.TODO())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
t.RecordMessage("publish our address to the miners addr topic")
|
|
||||||
actoraddress, err := n.minerApi.ActorAddress(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
addrinfo, err := n.minerApi.NetAddrsListen(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
t.SyncClient.MustPublish(ctx, minersAddrsTopic, MinerAddresses{addrinfo, actoraddress})
|
|
||||||
|
|
||||||
t.RecordMessage("waiting for all nodes to be ready")
|
|
||||||
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
|
|
||||||
|
|
||||||
return n, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func prepareClient(t *TestEnvironment) (*Node, error) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
drandOpt, err := getDrandConfig(ctx, t)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// first create a wallet
|
|
||||||
walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// publish the account ID/balance
|
|
||||||
balance := t.IntParam("balance")
|
|
||||||
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
|
|
||||||
t.SyncClient.Publish(ctx, balanceTopic, balanceMsg)
|
|
||||||
|
|
||||||
// then collect the genesis block and bootstrapper address
|
|
||||||
genesisMsg, err := waitForGenesis(t, ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
clientIP := t.NetClient.MustGetDataNetworkIP().String()
|
|
||||||
|
|
||||||
nodeRepo := repo.NewMemory(nil)
|
|
||||||
|
|
||||||
// create the node
|
|
||||||
n := &Node{}
|
|
||||||
stop, err := node.New(context.Background(),
|
|
||||||
node.FullAPI(&n.fullApi),
|
|
||||||
node.Online(),
|
|
||||||
node.Repo(nodeRepo),
|
|
||||||
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
|
|
||||||
withGenesis(genesisMsg.Genesis),
|
|
||||||
withListenAddress(clientIP),
|
|
||||||
withBootstrapper(genesisMsg.Bootstrapper),
|
|
||||||
withPubsubConfig(false, pubsubTracer),
|
|
||||||
drandOpt,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
n.stop = stop
|
|
||||||
|
|
||||||
// set the wallet
|
|
||||||
err = n.setWallet(ctx, walletKey)
|
|
||||||
if err != nil {
|
|
||||||
stop(context.TODO())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = startClientAPIServer(nodeRepo, n.fullApi)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
registerAndExportMetrics(fmt.Sprintf("client_%d", t.GroupSeq))
|
|
||||||
|
|
||||||
t.RecordMessage("publish our address to the clients addr topic")
|
|
||||||
addrinfo, err := n.fullApi.NetAddrsListen(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
t.SyncClient.MustPublish(ctx, clientsAddrsTopic, addrinfo)
|
|
||||||
|
|
||||||
t.RecordMessage("waiting for all nodes to be ready")
|
|
||||||
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
|
|
||||||
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Node) setWallet(ctx context.Context, walletKey *wallet.Key) error {
|
func (n *Node) setWallet(ctx context.Context, walletKey *wallet.Key) error {
|
||||||
_, err := n.fullApi.WalletImport(ctx, &walletKey.KeyInfo)
|
_, err := n.fullApi.WalletImport(ctx, &walletKey.KeyInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -607,58 +90,6 @@ func (n *Node) setWallet(ctx context.Context, walletKey *wallet.Key) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func withGenesis(gb []byte) node.Option {
|
|
||||||
return node.Override(new(modules.Genesis), modules.LoadGenesis(gb))
|
|
||||||
}
|
|
||||||
|
|
||||||
func withBootstrapper(ab []byte) node.Option {
|
|
||||||
return node.Override(new(dtypes.BootstrapPeers),
|
|
||||||
func() (dtypes.BootstrapPeers, error) {
|
|
||||||
if ab == nil {
|
|
||||||
return dtypes.BootstrapPeers{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
a, err := ma.NewMultiaddrBytes(ab)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ai, err := peer.AddrInfoFromP2pAddr(a)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return dtypes.BootstrapPeers{*ai}, nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func withPubsubConfig(bootstrapper bool, pubsubTracer string) node.Option {
|
|
||||||
return node.Override(new(*config.Pubsub), func() *config.Pubsub {
|
|
||||||
return &config.Pubsub{
|
|
||||||
Bootstrapper: bootstrapper,
|
|
||||||
RemoteTracer: pubsubTracer,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func withListenAddress(ip string) node.Option {
|
|
||||||
addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4001", ip)}
|
|
||||||
return node.Override(node.StartListeningKey, lp2p.StartListening(addrs))
|
|
||||||
}
|
|
||||||
|
|
||||||
func withMinerListenAddress(ip string) node.Option {
|
|
||||||
addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4002", ip)}
|
|
||||||
return node.Override(node.StartListeningKey, lp2p.StartListening(addrs))
|
|
||||||
}
|
|
||||||
|
|
||||||
func withApiEndpoint(addr string) node.Option {
|
|
||||||
return node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error {
|
|
||||||
apima, err := multiaddr.NewMultiaddr(addr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return lr.SetAPIEndpoint(apima)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) {
|
func waitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) {
|
||||||
ch := make(chan *InitialBalanceMsg)
|
ch := make(chan *InitialBalanceMsg)
|
||||||
sub := t.SyncClient.MustSubscribe(ctx, balanceTopic, ch)
|
sub := t.SyncClient.MustSubscribe(ctx, balanceTopic, ch)
|
||||||
@ -709,11 +140,11 @@ func waitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func collectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddresses, error) {
|
func collectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddressesMsg, error) {
|
||||||
ch := make(chan MinerAddresses)
|
ch := make(chan MinerAddressesMsg)
|
||||||
sub := t.SyncClient.MustSubscribe(ctx, minersAddrsTopic, ch)
|
sub := t.SyncClient.MustSubscribe(ctx, minersAddrsTopic, ch)
|
||||||
|
|
||||||
addrs := make([]MinerAddresses, 0, miners)
|
addrs := make([]MinerAddressesMsg, 0, miners)
|
||||||
for i := 0; i < miners; i++ {
|
for i := 0; i < miners; i++ {
|
||||||
select {
|
select {
|
||||||
case a := <-ch:
|
case a := <-ch:
|
||||||
@ -743,7 +174,7 @@ func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([
|
|||||||
return addrs, nil
|
return addrs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPubsubTracerConfig(ctx context.Context, t *TestEnvironment) (string, error) {
|
func getPubsubTracerMaddr(ctx context.Context, t *TestEnvironment) (string, error) {
|
||||||
if !t.BooleanParam("enable_pubsub_tracer") {
|
if !t.BooleanParam("enable_pubsub_tracer") {
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
@ -753,13 +184,13 @@ func getPubsubTracerConfig(ctx context.Context, t *TestEnvironment) (string, err
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case m := <-ch:
|
case m := <-ch:
|
||||||
return m.Tracer, nil
|
return m.Multiaddr, nil
|
||||||
case err := <-sub.Done():
|
case err := <-sub.Done():
|
||||||
return "", fmt.Errorf("got error while waiting for pubsub tracer config: %w", err)
|
return "", fmt.Errorf("got error while waiting for pubsub tracer config: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error) {
|
func getDrandOpts(ctx context.Context, t *TestEnvironment) (node.Option, error) {
|
||||||
beaconType := t.StringParam("random_beacon_type")
|
beaconType := t.StringParam("random_beacon_type")
|
||||||
switch beaconType {
|
switch beaconType {
|
||||||
case "external-drand":
|
case "external-drand":
|
||||||
@ -795,42 +226,6 @@ func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func startStorMinerAPIServer(repo *repo.MemRepo, minerApi api.StorageMiner) error {
|
|
||||||
mux := mux.NewRouter()
|
|
||||||
|
|
||||||
rpcServer := jsonrpc.NewServer()
|
|
||||||
rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(minerApi))
|
|
||||||
|
|
||||||
mux.Handle("/rpc/v0", rpcServer)
|
|
||||||
mux.PathPrefix("/remote").HandlerFunc(minerApi.(*impl.StorageMinerAPI).ServeRemote)
|
|
||||||
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
|
|
||||||
|
|
||||||
ah := &auth.Handler{
|
|
||||||
Verify: minerApi.AuthVerify,
|
|
||||||
Next: mux.ServeHTTP,
|
|
||||||
}
|
|
||||||
|
|
||||||
srv := &http.Server{Handler: ah}
|
|
||||||
|
|
||||||
return startServer(repo, srv)
|
|
||||||
}
|
|
||||||
|
|
||||||
func startClientAPIServer(repo *repo.MemRepo, api api.FullNode) error {
|
|
||||||
rpcServer := jsonrpc.NewServer()
|
|
||||||
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(api))
|
|
||||||
|
|
||||||
ah := &auth.Handler{
|
|
||||||
Verify: api.AuthVerify,
|
|
||||||
Next: rpcServer.ServeHTTP,
|
|
||||||
}
|
|
||||||
|
|
||||||
http.Handle("/rpc/v0", ah)
|
|
||||||
|
|
||||||
srv := &http.Server{Handler: http.DefaultServeMux}
|
|
||||||
|
|
||||||
return startServer(repo, srv)
|
|
||||||
}
|
|
||||||
|
|
||||||
func startServer(repo *repo.MemRepo, srv *http.Server) error {
|
func startServer(repo *repo.MemRepo, srv *http.Server) error {
|
||||||
endpoint, err := repo.APIEndpoint()
|
endpoint, err := repo.APIEndpoint()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
103
lotus-soup/retrieval.go
Normal file
103
lotus-soup/retrieval.go
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
files "github.com/ipfs/go-ipfs-files"
|
||||||
|
ipld "github.com/ipfs/go-ipld-format"
|
||||||
|
dag "github.com/ipfs/go-merkledag"
|
||||||
|
dstest "github.com/ipfs/go-merkledag/test"
|
||||||
|
unixfile "github.com/ipfs/go-unixfs/file"
|
||||||
|
"github.com/ipld/go-car"
|
||||||
|
)
|
||||||
|
|
||||||
|
func retrieveData(t *TestEnvironment, ctx context.Context, err error, client api.FullNode, fcid cid.Cid, carExport bool, data []byte) {
|
||||||
|
t1 := time.Now()
|
||||||
|
offers, err := client.ClientFindData(ctx, fcid)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
for _, o := range offers {
|
||||||
|
t.D().Counter(fmt.Sprintf("find-data.offer,miner=%s", o.Miner)).Inc(1)
|
||||||
|
}
|
||||||
|
t.D().ResettingHistogram("find-data").Update(int64(time.Since(t1)))
|
||||||
|
|
||||||
|
if len(offers) < 1 {
|
||||||
|
panic("no offers")
|
||||||
|
}
|
||||||
|
|
||||||
|
rpath, err := ioutil.TempDir("", "lotus-retrieve-test-")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(rpath)
|
||||||
|
|
||||||
|
caddr, err := client.WalletDefaultAddress(ctx)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ref := &api.FileRef{
|
||||||
|
Path: filepath.Join(rpath, "ret"),
|
||||||
|
IsCAR: carExport,
|
||||||
|
}
|
||||||
|
t1 = time.Now()
|
||||||
|
err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
t.D().ResettingHistogram("retrieve-data").Update(int64(time.Since(t1)))
|
||||||
|
|
||||||
|
rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret"))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if carExport {
|
||||||
|
rdata = extractCarData(ctx, rdata, rpath)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(rdata, data) {
|
||||||
|
panic("wrong data retrieved")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.RecordMessage("retrieved successfully")
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractCarData(ctx context.Context, rdata []byte, rpath string) []byte {
|
||||||
|
bserv := dstest.Bserv()
|
||||||
|
ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
b, err := bserv.GetBlock(ctx, ch.Roots[0])
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
nd, err := ipld.Decode(b)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
dserv := dag.NewDAGService(bserv)
|
||||||
|
fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
outPath := filepath.Join(rpath, "retLoadedCAR")
|
||||||
|
if err := files.WriteTo(fil, outPath); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
rdata, err = ioutil.ReadFile(outPath)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return rdata
|
||||||
|
}
|
161
lotus-soup/role_bootstrapper.go
Normal file
161
lotus-soup/role_bootstrapper.go
Normal file
@ -0,0 +1,161 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/genesis"
|
||||||
|
"github.com/filecoin-project/lotus/node"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
|
modtest "github.com/filecoin-project/lotus/node/modules/testing"
|
||||||
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
)
|
||||||
|
|
||||||
|
func runBootstrapper(t *TestEnvironment) error {
|
||||||
|
t.RecordMessage("running bootstrapper")
|
||||||
|
_, err := prepareBootstrapper(t)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
pubsubTracer, err := getPubsubTracerMaddr(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
clients := t.IntParam("clients")
|
||||||
|
miners := t.IntParam("miners")
|
||||||
|
nodes := clients + miners
|
||||||
|
|
||||||
|
drandOpt, err := getDrandOpts(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// the first duty of the boostrapper is to construct the genesis block
|
||||||
|
// first collect all client and miner balances to assign initial funds
|
||||||
|
balances, err := waitForBalances(t, ctx, nodes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// then collect all preseals from miners
|
||||||
|
preseals, err := collectPreseals(t, ctx, miners)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// now construct the genesis block
|
||||||
|
var genesisActors []genesis.Actor
|
||||||
|
var genesisMiners []genesis.Miner
|
||||||
|
|
||||||
|
for _, bm := range balances {
|
||||||
|
genesisActors = append(genesisActors,
|
||||||
|
genesis.Actor{
|
||||||
|
Type: genesis.TAccount,
|
||||||
|
Balance: big.Mul(big.NewInt(int64(bm.Balance)), types.NewInt(build.FilecoinPrecision)),
|
||||||
|
Meta: (&genesis.AccountMeta{Owner: bm.Addr}).ActorMeta(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pm := range preseals {
|
||||||
|
genesisMiners = append(genesisMiners, pm.Miner)
|
||||||
|
}
|
||||||
|
|
||||||
|
genesisTemplate := genesis.Template{
|
||||||
|
Accounts: genesisActors,
|
||||||
|
Miners: genesisMiners,
|
||||||
|
Timestamp: uint64(time.Now().Unix()) - uint64(t.IntParam("genesis_timestamp_offset")), // this needs to be in the past
|
||||||
|
}
|
||||||
|
|
||||||
|
// dump the genesis block
|
||||||
|
// var jsonBuf bytes.Buffer
|
||||||
|
// jsonEnc := json.NewEncoder(&jsonBuf)
|
||||||
|
// err := jsonEnc.Encode(genesisTemplate)
|
||||||
|
// if err != nil {
|
||||||
|
// panic(err)
|
||||||
|
// }
|
||||||
|
// runenv.RecordMessage(fmt.Sprintf("Genesis template: %s", string(jsonBuf.Bytes())))
|
||||||
|
|
||||||
|
// this is horrendously disgusting, we use this contraption to side effect the construction
|
||||||
|
// of the genesis block in the buffer -- yes, a side effect of dependency injection.
|
||||||
|
// I remember when software was straightforward...
|
||||||
|
var genesisBuffer bytes.Buffer
|
||||||
|
|
||||||
|
bootstrapperIP := t.NetClient.MustGetDataNetworkIP().String()
|
||||||
|
|
||||||
|
n := &Node{}
|
||||||
|
stop, err := node.New(context.Background(),
|
||||||
|
node.FullAPI(&n.fullApi),
|
||||||
|
node.Online(),
|
||||||
|
node.Repo(repo.NewMemory(nil)),
|
||||||
|
node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)),
|
||||||
|
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
|
||||||
|
withListenAddress(bootstrapperIP),
|
||||||
|
withBootstrapper(nil),
|
||||||
|
withPubsubConfig(true, pubsubTracer),
|
||||||
|
drandOpt,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
n.stop = stop
|
||||||
|
|
||||||
|
var bootstrapperAddr ma.Multiaddr
|
||||||
|
|
||||||
|
bootstrapperAddrs, err := n.fullApi.NetAddrsListen(ctx)
|
||||||
|
if err != nil {
|
||||||
|
stop(context.TODO())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, a := range bootstrapperAddrs.Addrs {
|
||||||
|
ip, err := a.ValueForProtocol(ma.P_IP4)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ip != bootstrapperIP {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
addrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{
|
||||||
|
ID: bootstrapperAddrs.ID,
|
||||||
|
Addrs: []ma.Multiaddr{a},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
bootstrapperAddr = addrs[0]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if bootstrapperAddr == nil {
|
||||||
|
panic("failed to determine bootstrapper address")
|
||||||
|
}
|
||||||
|
|
||||||
|
genesisMsg := &GenesisMsg{
|
||||||
|
Genesis: genesisBuffer.Bytes(),
|
||||||
|
Bootstrapper: bootstrapperAddr.Bytes(),
|
||||||
|
}
|
||||||
|
t.SyncClient.MustPublish(ctx, genesisTopic, genesisMsg)
|
||||||
|
|
||||||
|
t.RecordMessage("waiting for all nodes to be ready")
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
|
||||||
|
|
||||||
|
return n, nil
|
||||||
|
}
|
194
lotus-soup/role_client.go
Normal file
194
lotus-soup/role_client.go
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-jsonrpc"
|
||||||
|
"github.com/filecoin-project/go-jsonrpc/auth"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/api/apistruct"
|
||||||
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
|
"github.com/filecoin-project/lotus/node"
|
||||||
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/crypto"
|
||||||
|
)
|
||||||
|
|
||||||
|
func runBaselineClient(t *TestEnvironment) error {
|
||||||
|
t.RecordMessage("running client")
|
||||||
|
cl, err := prepareClient(t)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.RecordMessage("got %v miner addrs", len(addrs))
|
||||||
|
|
||||||
|
client := cl.fullApi
|
||||||
|
|
||||||
|
// select a random miner
|
||||||
|
minerAddr := addrs[rand.Intn(len(addrs))]
|
||||||
|
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1)
|
||||||
|
|
||||||
|
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
// generate 1600 bytes of random data
|
||||||
|
data := make([]byte, 1600)
|
||||||
|
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)
|
||||||
|
|
||||||
|
file, err := ioutil.TempFile("/tmp", "data")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer os.Remove(file.Name())
|
||||||
|
|
||||||
|
_, err = file.Write(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.RecordMessage("file cid: %s", fcid)
|
||||||
|
|
||||||
|
// start deal
|
||||||
|
t1 := time.Now()
|
||||||
|
deal := startDeal(ctx, minerAddr.ActorAddr, client, fcid)
|
||||||
|
t.RecordMessage("started deal: %s", deal)
|
||||||
|
|
||||||
|
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
t.RecordMessage("waiting for deal to be sealed")
|
||||||
|
waitDealSealed(t, ctx, client, deal)
|
||||||
|
t.D().ResettingHistogram("deal.sealed").Update(int64(time.Since(t1)))
|
||||||
|
|
||||||
|
carExport := true
|
||||||
|
|
||||||
|
t.RecordMessage("trying to retrieve %s", fcid)
|
||||||
|
retrieveData(t, ctx, err, client, fcid, carExport, data)
|
||||||
|
t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1)))
|
||||||
|
|
||||||
|
t.SyncClient.MustSignalEntry(ctx, stateStopMining)
|
||||||
|
|
||||||
|
time.Sleep(10 * time.Second) // wait for metrics to be emitted
|
||||||
|
|
||||||
|
// TODO broadcast published content CIDs to other clients
|
||||||
|
// TODO select a random piece of content published by some other client and retrieve it
|
||||||
|
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareClient(t *TestEnvironment) (*Node, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
pubsubTracer, err := getPubsubTracerMaddr(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
drandOpt, err := getDrandOpts(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// first create a wallet
|
||||||
|
walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// publish the account ID/balance
|
||||||
|
balance := t.IntParam("balance")
|
||||||
|
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
|
||||||
|
t.SyncClient.Publish(ctx, balanceTopic, balanceMsg)
|
||||||
|
|
||||||
|
// then collect the genesis block and bootstrapper address
|
||||||
|
genesisMsg, err := waitForGenesis(t, ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
clientIP := t.NetClient.MustGetDataNetworkIP().String()
|
||||||
|
|
||||||
|
nodeRepo := repo.NewMemory(nil)
|
||||||
|
|
||||||
|
// create the node
|
||||||
|
n := &Node{}
|
||||||
|
stop, err := node.New(context.Background(),
|
||||||
|
node.FullAPI(&n.fullApi),
|
||||||
|
node.Online(),
|
||||||
|
node.Repo(nodeRepo),
|
||||||
|
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
|
||||||
|
withGenesis(genesisMsg.Genesis),
|
||||||
|
withListenAddress(clientIP),
|
||||||
|
withBootstrapper(genesisMsg.Bootstrapper),
|
||||||
|
withPubsubConfig(false, pubsubTracer),
|
||||||
|
drandOpt,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
n.stop = stop
|
||||||
|
|
||||||
|
// set the wallet
|
||||||
|
err = n.setWallet(ctx, walletKey)
|
||||||
|
if err != nil {
|
||||||
|
stop(context.TODO())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = startClientAPIServer(nodeRepo, n.fullApi)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
registerAndExportMetrics(fmt.Sprintf("client_%d", t.GroupSeq))
|
||||||
|
|
||||||
|
t.RecordMessage("publish our address to the clients addr topic")
|
||||||
|
addrinfo, err := n.fullApi.NetAddrsListen(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
t.SyncClient.MustPublish(ctx, clientsAddrsTopic, addrinfo)
|
||||||
|
|
||||||
|
t.RecordMessage("waiting for all nodes to be ready")
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
|
||||||
|
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startClientAPIServer(repo *repo.MemRepo, api api.FullNode) error {
|
||||||
|
rpcServer := jsonrpc.NewServer()
|
||||||
|
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(api))
|
||||||
|
|
||||||
|
ah := &auth.Handler{
|
||||||
|
Verify: api.AuthVerify,
|
||||||
|
Next: rpcServer.ServeHTTP,
|
||||||
|
}
|
||||||
|
|
||||||
|
http.Handle("/rpc/v0", ah)
|
||||||
|
|
||||||
|
srv := &http.Server{Handler: http.DefaultServeMux}
|
||||||
|
|
||||||
|
return startServer(repo, srv)
|
||||||
|
}
|
@ -14,28 +14,19 @@ import (
|
|||||||
|
|
||||||
"github.com/drand/drand/chain"
|
"github.com/drand/drand/chain"
|
||||||
hclient "github.com/drand/drand/client/http"
|
hclient "github.com/drand/drand/client/http"
|
||||||
|
"github.com/drand/drand/demo/node"
|
||||||
"github.com/drand/drand/log"
|
"github.com/drand/drand/log"
|
||||||
"github.com/drand/drand/lp2p"
|
"github.com/drand/drand/lp2p"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
"github.com/testground/sdk-go/sync"
|
"github.com/testground/sdk-go/sync"
|
||||||
|
|
||||||
"github.com/drand/drand/demo/node"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var PrepareDrandTimeout = time.Minute
|
||||||
PrepareDrandTimeout = time.Minute
|
|
||||||
drandConfigTopic = sync.NewTopic("drand-config", &DrandRuntimeInfo{})
|
|
||||||
)
|
|
||||||
|
|
||||||
type DrandRuntimeInfo struct {
|
|
||||||
Config dtypes.DrandConfig
|
|
||||||
GossipBootstrap dtypes.DrandBootstrap
|
|
||||||
}
|
|
||||||
|
|
||||||
type DrandInstance struct {
|
type DrandInstance struct {
|
||||||
Node node.Node
|
Node node.Node
|
||||||
GossipRelay *lp2p.GossipRelayNode
|
GossipRelay *lp2p.GossipRelayNode
|
||||||
|
|
||||||
stateDir string
|
stateDir string
|
||||||
@ -45,17 +36,18 @@ func (d *DrandInstance) Cleanup() error {
|
|||||||
return os.RemoveAll(d.stateDir)
|
return os.RemoveAll(d.stateDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitForDrandConfig should be called by filecoin instances before constructing the lotus Node
|
func runDrandNode(t *TestEnvironment) error {
|
||||||
// you can use the returned dtypes.DrandConfig to override the default production config.
|
t.RecordMessage("running drand node")
|
||||||
func waitForDrandConfig(ctx context.Context, client sync.Client) (*DrandRuntimeInfo, error) {
|
dr, err := prepareDrandNode(t)
|
||||||
ch := make(chan *DrandRuntimeInfo, 1)
|
if err != nil {
|
||||||
sub := client.MustSubscribe(ctx, drandConfigTopic, ch)
|
return err
|
||||||
select {
|
|
||||||
case cfg := <-ch:
|
|
||||||
return cfg, nil
|
|
||||||
case err := <-sub.Done():
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
defer dr.Cleanup()
|
||||||
|
|
||||||
|
// TODO add ability to halt / recover on demand
|
||||||
|
ctx := context.Background()
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepareDrandNode starts a drand instance and runs a DKG with the other members of the composition group.
|
// prepareDrandNode starts a drand instance and runs a DKG with the other members of the composition group.
|
||||||
@ -228,12 +220,25 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &DrandInstance{
|
return &DrandInstance{
|
||||||
Node: n,
|
Node: n,
|
||||||
GossipRelay: gossipRelay,
|
GossipRelay: gossipRelay,
|
||||||
stateDir: stateDir,
|
stateDir: stateDir,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForDrandConfig should be called by filecoin instances before constructing the lotus Node
|
||||||
|
// you can use the returned dtypes.DrandConfig to override the default production config.
|
||||||
|
func waitForDrandConfig(ctx context.Context, client sync.Client) (*DrandRuntimeInfo, error) {
|
||||||
|
ch := make(chan *DrandRuntimeInfo, 1)
|
||||||
|
sub := client.MustSubscribe(ctx, drandConfigTopic, ch)
|
||||||
|
select {
|
||||||
|
case cfg := <-ch:
|
||||||
|
return cfg, nil
|
||||||
|
case err := <-sub.Done():
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func relayAddrInfo(addrs []ma.Multiaddr, dataIP net.IP) (*peer.AddrInfo, error) {
|
func relayAddrInfo(addrs []ma.Multiaddr, dataIP net.IP) (*peer.AddrInfo, error) {
|
||||||
for _, a := range addrs {
|
for _, a := range addrs {
|
||||||
if ip, _ := a.ValueForProtocol(ma.P_IP4); ip != dataIP.String() {
|
if ip, _ := a.ValueForProtocol(ma.P_IP4); ip != dataIP.String() {
|
||||||
@ -242,4 +247,4 @@ func relayAddrInfo(addrs []ma.Multiaddr, dataIP net.IP) (*peer.AddrInfo, error)
|
|||||||
return peer.AddrInfoFromP2pAddr(a)
|
return peer.AddrInfoFromP2pAddr(a)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("no addr found with data ip %s in addrs: %v", dataIP, addrs)
|
return nil, fmt.Errorf("no addr found with data ip %s in addrs: %v", dataIP, addrs)
|
||||||
}
|
}
|
377
lotus-soup/role_miner.go
Normal file
377
lotus-soup/role_miner.go
Normal file
@ -0,0 +1,377 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-jsonrpc"
|
||||||
|
"github.com/filecoin-project/go-jsonrpc/auth"
|
||||||
|
"github.com/filecoin-project/go-storedcounter"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/api/apistruct"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
|
genesis_chain "github.com/filecoin-project/lotus/chain/gen/genesis"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
|
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
|
||||||
|
"github.com/filecoin-project/lotus/miner"
|
||||||
|
"github.com/filecoin-project/lotus/node"
|
||||||
|
"github.com/filecoin-project/lotus/node/impl"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
|
saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/crypto"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
|
||||||
|
"github.com/testground/sdk-go/sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
func runMiner(t *TestEnvironment) error {
|
||||||
|
t.RecordMessage("running miner")
|
||||||
|
miner, err := prepareMiner(t)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.RecordMessage("block delay: %v", build.BlockDelay)
|
||||||
|
t.D().Gauge("miner.block-delay").Update(build.BlockDelay)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
clients := t.IntParam("clients")
|
||||||
|
miners := t.IntParam("miners")
|
||||||
|
|
||||||
|
myActorAddr, err := miner.minerApi.ActorAddress(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// mine / stop mining
|
||||||
|
mine := true
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
if miner.MineOne != nil {
|
||||||
|
go func() {
|
||||||
|
defer t.RecordMessage("shutting down mining")
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
var i int
|
||||||
|
for i = 0; mine; i++ {
|
||||||
|
// synchronize all miners to mine the next block
|
||||||
|
t.RecordMessage("synchronizing all miners to mine next block [%d]", i)
|
||||||
|
stateMineNext := sync.State(fmt.Sprintf("mine-block-%d", i))
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateMineNext, miners)
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
err := miner.MineOne(ctx, func(mined bool) {
|
||||||
|
if mined {
|
||||||
|
t.D().Counter(fmt.Sprintf("block.mine,miner=%s", myActorAddr)).Inc(1)
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// signal the last block to make sure no miners are left stuck waiting for the next block signal
|
||||||
|
// while the others have stopped
|
||||||
|
stateMineLast := sync.State(fmt.Sprintf("mine-block-%d", i))
|
||||||
|
t.SyncClient.MustSignalEntry(ctx, stateMineLast)
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
close(done)
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for a signal from all clients to stop mining
|
||||||
|
err = <-t.SyncClient.MustBarrier(ctx, stateStopMining, clients).C
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mine = false
|
||||||
|
<-done
|
||||||
|
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareMiner(t *TestEnvironment) (*Node, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
pubsubTracer, err := getPubsubTracerMaddr(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
drandOpt, err := getDrandOpts(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// first create a wallet
|
||||||
|
walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// publish the account ID/balance
|
||||||
|
balance := t.IntParam("balance")
|
||||||
|
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
|
||||||
|
t.SyncClient.Publish(ctx, balanceTopic, balanceMsg)
|
||||||
|
|
||||||
|
// create and publish the preseal commitment
|
||||||
|
priv, _, err := libp2p_crypto.GenerateEd25519Key(rand.Reader)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
minerID, err := peer.IDFromPrivateKey(priv)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// pick unique sequence number for each miner, no matter in which group they are
|
||||||
|
seq := t.SyncClient.MustSignalAndWait(ctx, stateMinerPickSeqNum, t.IntParam("miners"))
|
||||||
|
|
||||||
|
minerAddr, err := address.NewIDAddress(genesis_chain.MinerStart + uint64(seq-1))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
presealDir, err := ioutil.TempDir("", "preseal")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
sectors := t.IntParam("sectors")
|
||||||
|
genMiner, _, err := seed.PreSeal(minerAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, 0, sectors, presealDir, []byte("TODO: randomize this"), &walletKey.KeyInfo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
genMiner.PeerId = minerID
|
||||||
|
|
||||||
|
t.RecordMessage("Miner Info: Owner: %s Worker: %s", genMiner.Owner, genMiner.Worker)
|
||||||
|
|
||||||
|
presealMsg := &PresealMsg{Miner: *genMiner, Seqno: seq}
|
||||||
|
t.SyncClient.Publish(ctx, presealTopic, presealMsg)
|
||||||
|
|
||||||
|
// then collect the genesis block and bootstrapper address
|
||||||
|
genesisMsg, err := waitForGenesis(t, ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepare the repo
|
||||||
|
minerRepo := repo.NewMemory(nil)
|
||||||
|
|
||||||
|
lr, err := minerRepo.Lock(repo.StorageMiner)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ks, err := lr.KeyStore()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
kbytes, err := priv.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ks.Put("libp2p-host", types.KeyInfo{
|
||||||
|
Type: "libp2p-host",
|
||||||
|
PrivateKey: kbytes,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ds, err := lr.Datastore("/metadata")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ds.Put(datastore.NewKey("miner-address"), minerAddr.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nic := storedcounter.New(ds, datastore.NewKey(modules.StorageCounterDSPrefix))
|
||||||
|
for i := 0; i < (sectors + 1); i++ {
|
||||||
|
_, err = nic.Next()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = lr.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
minerIP := t.NetClient.MustGetDataNetworkIP().String()
|
||||||
|
|
||||||
|
// create the node
|
||||||
|
// we need both a full node _and_ and storage miner node
|
||||||
|
n := &Node{}
|
||||||
|
|
||||||
|
nodeRepo := repo.NewMemory(nil)
|
||||||
|
|
||||||
|
stop1, err := node.New(context.Background(),
|
||||||
|
node.FullAPI(&n.fullApi),
|
||||||
|
node.Online(),
|
||||||
|
node.Repo(nodeRepo),
|
||||||
|
withGenesis(genesisMsg.Genesis),
|
||||||
|
withListenAddress(minerIP),
|
||||||
|
withBootstrapper(genesisMsg.Bootstrapper),
|
||||||
|
withPubsubConfig(false, pubsubTracer),
|
||||||
|
drandOpt,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the wallet
|
||||||
|
err = n.setWallet(ctx, walletKey)
|
||||||
|
if err != nil {
|
||||||
|
stop1(context.TODO())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
minerOpts := []node.Option{
|
||||||
|
node.StorageMiner(&n.minerApi),
|
||||||
|
node.Online(),
|
||||||
|
node.Repo(minerRepo),
|
||||||
|
node.Override(new(api.FullNode), n.fullApi),
|
||||||
|
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
|
||||||
|
withMinerListenAddress(minerIP),
|
||||||
|
}
|
||||||
|
|
||||||
|
if t.StringParam("mining_mode") != "natural" {
|
||||||
|
mineBlock := make(chan func(bool))
|
||||||
|
minerOpts = append(minerOpts,
|
||||||
|
node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock, minerAddr)))
|
||||||
|
n.MineOne = func(ctx context.Context, cb func(bool)) error {
|
||||||
|
select {
|
||||||
|
case mineBlock <- cb:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stop2, err := node.New(context.Background(), minerOpts...)
|
||||||
|
if err != nil {
|
||||||
|
stop1(context.TODO())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
n.stop = func(ctx context.Context) error {
|
||||||
|
// TODO use a multierror for this
|
||||||
|
err2 := stop2(ctx)
|
||||||
|
err1 := stop1(ctx)
|
||||||
|
if err2 != nil {
|
||||||
|
return err2
|
||||||
|
}
|
||||||
|
return err1
|
||||||
|
}
|
||||||
|
|
||||||
|
registerAndExportMetrics(minerAddr.String())
|
||||||
|
|
||||||
|
// Bootstrap with full node
|
||||||
|
remoteAddrs, err := n.fullApi.NetAddrsListen(ctx)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = n.minerApi.NetConnect(ctx, remoteAddrs)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = startStorMinerAPIServer(minerRepo, n.minerApi)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// add local storage for presealed sectors
|
||||||
|
err = n.minerApi.StorageAddLocal(ctx, presealDir)
|
||||||
|
if err != nil {
|
||||||
|
n.stop(context.TODO())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the miner PeerID
|
||||||
|
minerIDEncoded, err := actors.SerializeParams(&saminer.ChangePeerIDParams{NewID: abi.PeerID(minerID)})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
changeMinerID := &types.Message{
|
||||||
|
To: minerAddr,
|
||||||
|
From: genMiner.Worker,
|
||||||
|
Method: builtin.MethodsMiner.ChangePeerID,
|
||||||
|
Params: minerIDEncoded,
|
||||||
|
Value: types.NewInt(0),
|
||||||
|
GasPrice: types.NewInt(0),
|
||||||
|
GasLimit: 1000000,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = n.fullApi.MpoolPushMessage(ctx, changeMinerID)
|
||||||
|
if err != nil {
|
||||||
|
n.stop(context.TODO())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.RecordMessage("publish our address to the miners addr topic")
|
||||||
|
actoraddress, err := n.minerApi.ActorAddress(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
addrinfo, err := n.minerApi.NetAddrsListen(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
t.SyncClient.MustPublish(ctx, minersAddrsTopic, MinerAddressesMsg{addrinfo, actoraddress})
|
||||||
|
|
||||||
|
t.RecordMessage("waiting for all nodes to be ready")
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func startStorMinerAPIServer(repo *repo.MemRepo, minerApi api.StorageMiner) error {
|
||||||
|
mux := mux.NewRouter()
|
||||||
|
|
||||||
|
rpcServer := jsonrpc.NewServer()
|
||||||
|
rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(minerApi))
|
||||||
|
|
||||||
|
mux.Handle("/rpc/v0", rpcServer)
|
||||||
|
mux.PathPrefix("/remote").HandlerFunc(minerApi.(*impl.StorageMinerAPI).ServeRemote)
|
||||||
|
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
|
||||||
|
|
||||||
|
ah := &auth.Handler{
|
||||||
|
Verify: minerApi.AuthVerify,
|
||||||
|
Next: mux.ServeHTTP,
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := &http.Server{Handler: ah}
|
||||||
|
|
||||||
|
return startServer(repo, srv)
|
||||||
|
}
|
@ -5,8 +5,6 @@ import (
|
|||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/testground/sdk-go/sync"
|
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p"
|
"github.com/libp2p/go-libp2p"
|
||||||
"github.com/libp2p/go-libp2p-core/crypto"
|
"github.com/libp2p/go-libp2p-core/crypto"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
@ -15,19 +13,11 @@ import (
|
|||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
pubsubTracerTopic = sync.NewTopic("pubsubTracer", &PubsubTracerMsg{})
|
|
||||||
)
|
|
||||||
|
|
||||||
type PubsubTracer struct {
|
type PubsubTracer struct {
|
||||||
host host.Host
|
host host.Host
|
||||||
traced *traced.TraceCollector
|
traced *traced.TraceCollector
|
||||||
}
|
}
|
||||||
|
|
||||||
type PubsubTracerMsg struct {
|
|
||||||
Tracer string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tr *PubsubTracer) Stop() error {
|
func (tr *PubsubTracer) Stop() error {
|
||||||
tr.traced.Stop()
|
tr.traced.Stop()
|
||||||
return tr.host.Close()
|
return tr.host.Close()
|
||||||
@ -63,7 +53,7 @@ func preparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) {
|
|||||||
t.RecordMessage("I am %s", tracedMultiaddrStr)
|
t.RecordMessage("I am %s", tracedMultiaddrStr)
|
||||||
|
|
||||||
_ = ma.StringCast(tracedMultiaddrStr)
|
_ = ma.StringCast(tracedMultiaddrStr)
|
||||||
tracedMsg := &PubsubTracerMsg{Tracer: tracedMultiaddrStr}
|
tracedMsg := &PubsubTracerMsg{Multiaddr: tracedMultiaddrStr}
|
||||||
t.SyncClient.MustPublish(ctx, pubsubTracerTopic, tracedMsg)
|
t.SyncClient.MustPublish(ctx, pubsubTracerTopic, tracedMsg)
|
||||||
|
|
||||||
t.RecordMessage("waiting for all nodes to be ready")
|
t.RecordMessage("waiting for all nodes to be ready")
|
27
lotus-soup/roles.go
Normal file
27
lotus-soup/roles.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
// This is the baseline test; Filecoin 101.
|
||||||
|
//
|
||||||
|
// A network with a bootstrapper, a number of miners, and a number of clients/full nodes
|
||||||
|
// is constructed and connected through the bootstrapper.
|
||||||
|
// Some funds are allocated to each node and a number of sectors are presealed in the genesis block.
|
||||||
|
//
|
||||||
|
// The test plan:
|
||||||
|
// One or more clients store content to one or more miners, testing storage deals.
|
||||||
|
// The plan ensures that the storage deals hit the blockchain and measure the time it took.
|
||||||
|
// Verification: one or more clients retrieve and verify the hashes of stored content.
|
||||||
|
// The plan ensures that all (previously) published content can be correctly retrieved
|
||||||
|
// and measures the time it took.
|
||||||
|
//
|
||||||
|
// Preparation of the genesis block: this is the responsibility of the bootstrapper.
|
||||||
|
// In order to compute the genesis block, we need to collect identities and presealed
|
||||||
|
// sectors from each node.
|
||||||
|
// Then we create a genesis block that allocates some funds to each node and collects
|
||||||
|
// the presealed sectors.
|
||||||
|
var basicRoles = map[string]func(*TestEnvironment) error{
|
||||||
|
"bootstrapper": runBootstrapper,
|
||||||
|
"miner": runMiner,
|
||||||
|
"client": runBaselineClient,
|
||||||
|
"drand": runDrandNode,
|
||||||
|
"pubsub-tracer": runPubsubTracer,
|
||||||
|
}
|
55
lotus-soup/sync.go
Normal file
55
lotus-soup/sync.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/genesis"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
"github.com/testground/sdk-go/sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
genesisTopic = sync.NewTopic("genesis", &GenesisMsg{})
|
||||||
|
balanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{})
|
||||||
|
presealTopic = sync.NewTopic("preseal", &PresealMsg{})
|
||||||
|
clientsAddrsTopic = sync.NewTopic("clientsAddrsTopic", &peer.AddrInfo{})
|
||||||
|
minersAddrsTopic = sync.NewTopic("minersAddrsTopic", &MinerAddressesMsg{})
|
||||||
|
pubsubTracerTopic = sync.NewTopic("pubsubTracer", &PubsubTracerMsg{})
|
||||||
|
drandConfigTopic = sync.NewTopic("drand-config", &DrandRuntimeInfo{})
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
stateReady = sync.State("ready")
|
||||||
|
stateDone = sync.State("done")
|
||||||
|
stateStopMining = sync.State("stop-mining")
|
||||||
|
stateMinerPickSeqNum = sync.State("miner-pick-seq-num")
|
||||||
|
)
|
||||||
|
|
||||||
|
type InitialBalanceMsg struct {
|
||||||
|
Addr address.Address
|
||||||
|
Balance int
|
||||||
|
}
|
||||||
|
|
||||||
|
type PresealMsg struct {
|
||||||
|
Miner genesis.Miner
|
||||||
|
Seqno int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type GenesisMsg struct {
|
||||||
|
Genesis []byte
|
||||||
|
Bootstrapper []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type MinerAddressesMsg struct {
|
||||||
|
PeerAddr peer.AddrInfo
|
||||||
|
ActorAddr address.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
type PubsubTracerMsg struct {
|
||||||
|
Multiaddr string
|
||||||
|
}
|
||||||
|
|
||||||
|
type DrandRuntimeInfo struct {
|
||||||
|
Config dtypes.DrandConfig
|
||||||
|
GossipBootstrap dtypes.DrandBootstrap
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user