merge testkit into plan/drand-halting

This commit is contained in:
Yusef Napora 2020-07-01 12:54:46 -04:00
commit a8841a8b76
16 changed files with 599 additions and 499 deletions

View File

@ -1,28 +1,120 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"io/ioutil"
"math/rand"
"os"
"time"
"github.com/filecoin-project/lotus/api"
"github.com/testground/sdk-go/run" "github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
"github.com/filecoin-project/oni/lotus-soup/testkit"
) )
var testplans = map[string]interface{}{ var cases = map[string]interface{}{
"lotus-baseline": doRun(basicRoles), "deals-e2e": testkit.WrapTestEnvironment(dealsE2E),
"drand-halting": doRun(basicRoles), "drand-halting": testkit.WrapTestEnvironment(dealsE2E),
} }
func main() { func main() {
run.InvokeMap(testplans) run.InvokeMap(cases)
} }
func doRun(roles map[string]func(*TestEnvironment) error) run.InitializedTestCaseFn { // This is the baseline test; Filecoin 101.
return func(runenv *runtime.RunEnv, initCtx *run.InitContext) error { //
role := runenv.StringParam("role") // A network with a bootstrapper, a number of miners, and a number of clients/full nodes
proc, ok := roles[role] // is constructed and connected through the bootstrapper.
if ok { // Some funds are allocated to each node and a number of sectors are presealed in the genesis block.
return proc(&TestEnvironment{RunEnv: runenv, InitContext: initCtx}) //
// The test plan:
// One or more clients store content to one or more miners, testing storage deals.
// The plan ensures that the storage deals hit the blockchain and measure the time it took.
// Verification: one or more clients retrieve and verify the hashes of stored content.
// The plan ensures that all (previously) published content can be correctly retrieved
// and measures the time it took.
//
// Preparation of the genesis block: this is the responsibility of the bootstrapper.
// In order to compute the genesis block, we need to collect identities and presealed
// sectors from each node.
// Then we create a genesis block that allocates some funds to each node and collects
// the presealed sectors.
func dealsE2E(t *testkit.TestEnvironment) error {
// Dispatch/forward non-client roles to defaults.
if t.Role != "client" {
return testkit.HandleDefaultRole(t)
} }
return fmt.Errorf("Unknown role: %s", role)
cl, err := testkit.PrepareClient(t)
if err != nil {
return err
} }
// This is a client role
t.RecordMessage("running client")
ctx := context.Background()
client := cl.FullApi
// select a random miner
minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))]
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
return err
}
t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1)
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
time.Sleep(2 * time.Second)
// generate 1600 bytes of random data
data := make([]byte, 1600)
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)
file, err := ioutil.TempFile("/tmp", "data")
if err != nil {
return err
}
defer os.Remove(file.Name())
_, err = file.Write(data)
if err != nil {
return err
}
fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false})
if err != nil {
return err
}
t.RecordMessage("file cid: %s", fcid)
// start deal
t1 := time.Now()
deal := testkit.StartDeal(ctx, minerAddr.ActorAddr, client, fcid)
t.RecordMessage("started deal: %s", deal)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(2 * time.Second)
t.RecordMessage("waiting for deal to be sealed")
testkit.WaitDealSealed(t, ctx, client, deal)
t.D().ResettingHistogram("deal.sealed").Update(int64(time.Since(t1)))
carExport := true
t.RecordMessage("trying to retrieve %s", fcid)
testkit.RetrieveData(t, ctx, err, client, fcid, carExport, data)
t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1)))
t.SyncClient.MustSignalEntry(ctx, testkit.StateStopMining)
time.Sleep(10 * time.Second) // wait for metrics to be emitted
// TODO broadcast published content CIDs to other clients
// TODO select a random piece of content published by some other client and retrieve it
t.SyncClient.MustSignalAndWait(ctx, testkit.StateDone, t.TestInstanceCount)
return nil
} }

View File

@ -17,7 +17,7 @@ enabled = true
enabled = true enabled = true
[[testcases]] [[testcases]]
name = "lotus-baseline" name = "deals-e2e"
instances = { min = 1, max = 100, default = 5 } instances = { min = 1, max = 100, default = 5 }
[testcases.params] [testcases.params]

View File

@ -1,194 +0,0 @@
package main
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"os"
"time"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/specs-actors/actors/crypto"
)
func runBaselineClient(t *TestEnvironment) error {
t.RecordMessage("running client")
cl, err := prepareClient(t)
if err != nil {
return err
}
ctx := context.Background()
addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners"))
if err != nil {
return err
}
t.RecordMessage("got %v miner addrs", len(addrs))
client := cl.fullApi
// select a random miner
minerAddr := addrs[rand.Intn(len(addrs))]
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
return err
}
t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1)
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
time.Sleep(2 * time.Second)
// generate 1600 bytes of random data
data := make([]byte, 1600)
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)
file, err := ioutil.TempFile("/tmp", "data")
if err != nil {
return err
}
defer os.Remove(file.Name())
_, err = file.Write(data)
if err != nil {
return err
}
fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false})
if err != nil {
return err
}
t.RecordMessage("file cid: %s", fcid)
// start deal
t1 := time.Now()
deal := startDeal(ctx, minerAddr.ActorAddr, client, fcid)
t.RecordMessage("started deal: %s", deal)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(2 * time.Second)
t.RecordMessage("waiting for deal to be sealed")
waitDealSealed(t, ctx, client, deal)
t.D().ResettingHistogram("deal.sealed").Update(int64(time.Since(t1)))
carExport := true
t.RecordMessage("trying to retrieve %s", fcid)
retrieveData(t, ctx, err, client, fcid, carExport, data)
t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1)))
t.SyncClient.MustSignalEntry(ctx, stateStopMining)
time.Sleep(10 * time.Second) // wait for metrics to be emitted
// TODO broadcast published content CIDs to other clients
// TODO select a random piece of content published by some other client and retrieve it
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
return nil
}
func prepareClient(t *TestEnvironment) (*Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
pubsubTracer, err := getPubsubTracerMaddr(ctx, t)
if err != nil {
return nil, err
}
drandOpt, err := getDrandOpts(ctx, t)
if err != nil {
return nil, err
}
// first create a wallet
walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS)
if err != nil {
return nil, err
}
// publish the account ID/balance
balance := t.IntParam("balance")
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
t.SyncClient.Publish(ctx, balanceTopic, balanceMsg)
// then collect the genesis block and bootstrapper address
genesisMsg, err := waitForGenesis(t, ctx)
if err != nil {
return nil, err
}
clientIP := t.NetClient.MustGetDataNetworkIP().String()
nodeRepo := repo.NewMemory(nil)
// create the node
n := &Node{}
stop, err := node.New(context.Background(),
node.FullAPI(&n.fullApi),
node.Online(),
node.Repo(nodeRepo),
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
withGenesis(genesisMsg.Genesis),
withListenAddress(clientIP),
withBootstrapper(genesisMsg.Bootstrapper),
withPubsubConfig(false, pubsubTracer),
drandOpt,
)
if err != nil {
return nil, err
}
n.stop = stop
// set the wallet
err = n.setWallet(ctx, walletKey)
if err != nil {
stop(context.TODO())
return nil, err
}
err = startClientAPIServer(nodeRepo, n.fullApi)
if err != nil {
return nil, err
}
registerAndExportMetrics(fmt.Sprintf("client_%d", t.GroupSeq))
t.RecordMessage("publish our address to the clients addr topic")
addrinfo, err := n.fullApi.NetAddrsListen(ctx)
if err != nil {
return nil, err
}
t.SyncClient.MustPublish(ctx, clientsAddrsTopic, addrinfo)
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
return n, nil
}
func startClientAPIServer(repo *repo.MemRepo, api api.FullNode) error {
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(api))
ah := &auth.Handler{
Verify: api.AuthVerify,
Next: rpcServer.ServeHTTP,
}
http.Handle("/rpc/v0", ah)
srv := &http.Server{Handler: http.DefaultServeMux}
return startServer(repo, srv)
}

View File

@ -1,27 +0,0 @@
package main
// This is the baseline test; Filecoin 101.
//
// A network with a bootstrapper, a number of miners, and a number of clients/full nodes
// is constructed and connected through the bootstrapper.
// Some funds are allocated to each node and a number of sectors are presealed in the genesis block.
//
// The test plan:
// One or more clients store content to one or more miners, testing storage deals.
// The plan ensures that the storage deals hit the blockchain and measure the time it took.
// Verification: one or more clients retrieve and verify the hashes of stored content.
// The plan ensures that all (previously) published content can be correctly retrieved
// and measures the time it took.
//
// Preparation of the genesis block: this is the responsibility of the bootstrapper.
// In order to compute the genesis block, we need to collect identities and presealed
// sectors from each node.
// Then we create a genesis block that allocates some funds to each node and collects
// the presealed sectors.
var basicRoles = map[string]func(*TestEnvironment) error{
"bootstrapper": runBootstrapper,
"miner": runMiner,
"client": runBaselineClient,
"drand": runDrandNode,
"pubsub-tracer": runPubsubTracer,
}

View File

@ -1,4 +1,4 @@
package main package testkit
import ( import (
"context" "context"
@ -12,7 +12,7 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
) )
func startDeal(ctx context.Context, minerActorAddr address.Address, client api.FullNode, fcid cid.Cid) *cid.Cid { func StartDeal(ctx context.Context, minerActorAddr address.Address, client api.FullNode, fcid cid.Cid) *cid.Cid {
addr, err := client.WalletDefaultAddress(ctx) addr, err := client.WalletDefaultAddress(ctx)
if err != nil { if err != nil {
panic(err) panic(err)
@ -31,7 +31,7 @@ func startDeal(ctx context.Context, minerActorAddr address.Address, client api.F
return deal return deal
} }
func waitDealSealed(t *TestEnvironment, ctx context.Context, client api.FullNode, deal *cid.Cid) { func WaitDealSealed(t *TestEnvironment, ctx context.Context, client api.FullNode, deal *cid.Cid) {
loop: loop:
for { for {
di, err := client.ClientGetDealInfo(ctx, *deal) di, err := client.ClientGetDealInfo(ctx, *deal)

View File

@ -0,0 +1,55 @@
package testkit
import "fmt"
type RoleName = string
var DefaultRoles = map[RoleName]func(*TestEnvironment) error{
"bootstrapper": func(t *TestEnvironment) error {
b, err := PrepareBootstrapper(t)
if err != nil {
return err
}
return b.RunDefault()
},
"miner": func(t *TestEnvironment) error {
m, err := PrepareMiner(t)
if err != nil {
return err
}
return m.RunDefault()
},
"client": func(t *TestEnvironment) error {
c, err := PrepareClient(t)
if err != nil {
return err
}
return c.RunDefault()
},
"drand": func(t *TestEnvironment) error {
d, err := PrepareDrandInstance(t)
if err != nil {
return err
}
return d.RunDefault()
},
"pubsub-tracer": func(t *TestEnvironment) error {
tr, err := PreparePubsubTracer(t)
if err != nil {
return err
}
return tr.RunDefault()
},
}
// HandleDefaultRole handles a role by running its default behaviour.
//
// This function is suitable to forward to when a test case doesn't need to
// explicitly handle/alter a role.
func HandleDefaultRole(t *TestEnvironment) error {
f, ok := DefaultRoles[t.Role]
if !ok {
panic(fmt.Sprintf("unrecognized role: %s", t.Role))
}
return f(t)
}

View File

@ -1,4 +1,4 @@
package main package testkit
import ( import (
"fmt" "fmt"

View File

@ -1,4 +1,4 @@
package main package testkit
import ( import (
"context" "context"
@ -6,10 +6,8 @@ import (
"net/http" "net/http"
"os" "os"
"sort" "sort"
"strings"
"time" "time"
"github.com/davecgh/go-spew/spew"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/beacon"
@ -29,16 +27,14 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
) )
func init() { func init() {
logging.SetLogLevel("*", "ERROR") _ = logging.SetLogLevel("*", "ERROR")
os.Setenv("BELLMAN_NO_GPU", "1") _ = os.Setenv("BELLMAN_NO_GPU", "1")
build.InsecurePoStValidation = true build.InsecurePoStValidation = true
build.DisableBuiltinAssets = true build.DisableBuiltinAssets = true
@ -52,42 +48,20 @@ func init() {
var PrepareNodeTimeout = time.Minute var PrepareNodeTimeout = time.Minute
type TestEnvironment struct { type LotusNode struct {
*runtime.RunEnv FullApi api.FullNode
*run.InitContext MinerApi api.StorageMiner
} StopFn node.StopFunc
// workaround for default params being wrapped in quote chars
func (t *TestEnvironment) StringParam(name string) string {
return strings.Trim(t.RunEnv.StringParam(name), "\"")
}
func (t *TestEnvironment) DurationParam(name string) time.Duration {
d, err := time.ParseDuration(t.StringParam(name))
if err != nil {
panic(fmt.Errorf("invalid duration value for param '%s': %w", name, err))
}
return d
}
func (t *TestEnvironment) DebugSpew(format string, args ...interface{}) {
t.RecordMessage(spew.Sprintf(format, args...))
}
type Node struct {
fullApi api.FullNode
minerApi api.StorageMiner
stop node.StopFunc
MineOne func(context.Context, func(bool)) error MineOne func(context.Context, func(bool)) error
} }
func (n *Node) setWallet(ctx context.Context, walletKey *wallet.Key) error { func (n *LotusNode) setWallet(ctx context.Context, walletKey *wallet.Key) error {
_, err := n.fullApi.WalletImport(ctx, &walletKey.KeyInfo) _, err := n.FullApi.WalletImport(ctx, &walletKey.KeyInfo)
if err != nil { if err != nil {
return err return err
} }
err = n.fullApi.WalletSetDefault(ctx, walletKey.Address) err = n.FullApi.WalletSetDefault(ctx, walletKey.Address)
if err != nil { if err != nil {
return err return err
} }
@ -95,9 +69,9 @@ func (n *Node) setWallet(ctx context.Context, walletKey *wallet.Key) error {
return nil return nil
} }
func waitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) { func WaitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) {
ch := make(chan *InitialBalanceMsg) ch := make(chan *InitialBalanceMsg)
sub := t.SyncClient.MustSubscribe(ctx, balanceTopic, ch) sub := t.SyncClient.MustSubscribe(ctx, BalanceTopic, ch)
balances := make([]*InitialBalanceMsg, 0, nodes) balances := make([]*InitialBalanceMsg, 0, nodes)
for i := 0; i < nodes; i++ { for i := 0; i < nodes; i++ {
@ -112,9 +86,9 @@ func waitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*Ini
return balances, nil return balances, nil
} }
func collectPreseals(t *TestEnvironment, ctx context.Context, miners int) ([]*PresealMsg, error) { func CollectPreseals(t *TestEnvironment, ctx context.Context, miners int) ([]*PresealMsg, error) {
ch := make(chan *PresealMsg) ch := make(chan *PresealMsg)
sub := t.SyncClient.MustSubscribe(ctx, presealTopic, ch) sub := t.SyncClient.MustSubscribe(ctx, PresealTopic, ch)
preseals := make([]*PresealMsg, 0, miners) preseals := make([]*PresealMsg, 0, miners)
for i := 0; i < miners; i++ { for i := 0; i < miners; i++ {
@ -133,9 +107,9 @@ func collectPreseals(t *TestEnvironment, ctx context.Context, miners int) ([]*Pr
return preseals, nil return preseals, nil
} }
func waitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error) { func WaitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error) {
genesisCh := make(chan *GenesisMsg) genesisCh := make(chan *GenesisMsg)
sub := t.SyncClient.MustSubscribe(ctx, genesisTopic, genesisCh) sub := t.SyncClient.MustSubscribe(ctx, GenesisTopic, genesisCh)
select { select {
case genesisMsg := <-genesisCh: case genesisMsg := <-genesisCh:
@ -145,9 +119,9 @@ func waitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error
} }
} }
func collectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddressesMsg, error) { func CollectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddressesMsg, error) {
ch := make(chan MinerAddressesMsg) ch := make(chan MinerAddressesMsg)
sub := t.SyncClient.MustSubscribe(ctx, minersAddrsTopic, ch) sub := t.SyncClient.MustSubscribe(ctx, MinersAddrsTopic, ch)
addrs := make([]MinerAddressesMsg, 0, miners) addrs := make([]MinerAddressesMsg, 0, miners)
for i := 0; i < miners; i++ { for i := 0; i < miners; i++ {
@ -162,9 +136,9 @@ func collectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]M
return addrs, nil return addrs, nil
} }
func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]peer.AddrInfo, error) { func CollectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]peer.AddrInfo, error) {
ch := make(chan peer.AddrInfo) ch := make(chan peer.AddrInfo)
sub := t.SyncClient.MustSubscribe(ctx, clientsAddrsTopic, ch) sub := t.SyncClient.MustSubscribe(ctx, ClientsAddrsTopic, ch)
addrs := make([]peer.AddrInfo, 0, clients) addrs := make([]peer.AddrInfo, 0, clients)
for i := 0; i < clients; i++ { for i := 0; i < clients; i++ {
@ -179,13 +153,13 @@ func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([
return addrs, nil return addrs, nil
} }
func getPubsubTracerMaddr(ctx context.Context, t *TestEnvironment) (string, error) { func GetPubsubTracerMaddr(ctx context.Context, t *TestEnvironment) (string, error) {
if !t.BooleanParam("enable_pubsub_tracer") { if !t.BooleanParam("enable_pubsub_tracer") {
return "", nil return "", nil
} }
ch := make(chan *PubsubTracerMsg) ch := make(chan *PubsubTracerMsg)
sub := t.SyncClient.MustSubscribe(ctx, pubsubTracerTopic, ch) sub := t.SyncClient.MustSubscribe(ctx, PubsubTracerTopic, ch)
select { select {
case m := <-ch: case m := <-ch:
@ -195,7 +169,7 @@ func getPubsubTracerMaddr(ctx context.Context, t *TestEnvironment) (string, erro
} }
} }
func getDrandOpts(ctx context.Context, t *TestEnvironment) (node.Option, error) { func GetRandomBeaconOpts(ctx context.Context, t *TestEnvironment) (node.Option, error) {
beaconType := t.StringParam("random_beacon_type") beaconType := t.StringParam("random_beacon_type")
switch beaconType { switch beaconType {
case "external-drand": case "external-drand":
@ -211,7 +185,7 @@ func getDrandOpts(ctx context.Context, t *TestEnvironment) (node.Option, error)
return nil, err return nil, err
} }
t.DebugSpew("setting drand config: %v", cfg) t.RecordMessage("setting drand config: %v", cfg)
return node.Options( return node.Options(
node.Override(new(dtypes.DrandConfig), cfg.Config), node.Override(new(dtypes.DrandConfig), cfg.Config),
node.Override(new(dtypes.DrandBootstrap), cfg.GossipBootstrap), node.Override(new(dtypes.DrandBootstrap), cfg.GossipBootstrap),

View File

@ -1,4 +1,4 @@
package main package testkit
import ( import (
"bytes" "bytes"
@ -19,7 +19,7 @@ import (
"github.com/ipld/go-car" "github.com/ipld/go-car"
) )
func retrieveData(t *TestEnvironment, ctx context.Context, err error, client api.FullNode, fcid cid.Cid, carExport bool, data []byte) { func RetrieveData(t *TestEnvironment, ctx context.Context, err error, client api.FullNode, fcid cid.Cid, carExport bool, data []byte) {
t1 := time.Now() t1 := time.Now()
offers, err := client.ClientFindData(ctx, fcid) offers, err := client.ClientFindData(ctx, fcid)
if err != nil { if err != nil {
@ -62,7 +62,7 @@ func retrieveData(t *TestEnvironment, ctx context.Context, err error, client api
} }
if carExport { if carExport {
rdata = extractCarData(ctx, rdata, rpath) rdata = ExtractCarData(ctx, rdata, rpath)
} }
if !bytes.Equal(rdata, data) { if !bytes.Equal(rdata, data) {
@ -72,7 +72,7 @@ func retrieveData(t *TestEnvironment, ctx context.Context, err error, client api
t.RecordMessage("retrieved successfully") t.RecordMessage("retrieved successfully")
} }
func extractCarData(ctx context.Context, rdata []byte, rpath string) []byte { func ExtractCarData(ctx context.Context, rdata []byte, rpath string) []byte {
bserv := dstest.Bserv() bserv := dstest.Bserv()
ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata)) ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata))
if err != nil { if err != nil {

View File

@ -1,4 +1,4 @@
package main package testkit
import ( import (
"bytes" "bytes"
@ -19,45 +19,43 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
func runBootstrapper(t *TestEnvironment) error { // Bootstrapper is a special kind of process that produces a genesis block with
t.RecordMessage("running bootstrapper") // the initial wallet balances and preseals for all enlisted miners and clients.
_, err := prepareBootstrapper(t) type Bootstrapper struct {
if err != nil { *LotusNode
return err
t *TestEnvironment
} }
ctx := context.Background() func PrepareBootstrapper(t *TestEnvironment) (*Bootstrapper, error) {
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) var (
return nil clients = t.IntParam("clients")
} miners = t.IntParam("miners")
nodes = clients + miners
)
func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel() defer cancel()
pubsubTracer, err := getPubsubTracerMaddr(ctx, t) pubsubTracerMaddr, err := GetPubsubTracerMaddr(ctx, t)
if err != nil { if err != nil {
return nil, err return nil, err
} }
clients := t.IntParam("clients") randomBeaconOpt, err := GetRandomBeaconOpts(ctx, t)
miners := t.IntParam("miners")
nodes := clients + miners
drandOpt, err := getDrandOpts(ctx, t)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// the first duty of the boostrapper is to construct the genesis block // the first duty of the boostrapper is to construct the genesis block
// first collect all client and miner balances to assign initial funds // first collect all client and miner balances to assign initial funds
balances, err := waitForBalances(t, ctx, nodes) balances, err := WaitForBalances(t, ctx, nodes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// then collect all preseals from miners // then collect all preseals from miners
preseals, err := collectPreseals(t, ctx, miners) preseals, err := CollectPreseals(t, ctx, miners)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -101,26 +99,26 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
bootstrapperIP := t.NetClient.MustGetDataNetworkIP().String() bootstrapperIP := t.NetClient.MustGetDataNetworkIP().String()
n := &Node{} n := &LotusNode{}
stop, err := node.New(context.Background(), stop, err := node.New(context.Background(),
node.FullAPI(&n.fullApi), node.FullAPI(&n.FullApi),
node.Online(), node.Online(),
node.Repo(repo.NewMemory(nil)), node.Repo(repo.NewMemory(nil)),
node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)), node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)),
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"), withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
withListenAddress(bootstrapperIP), withListenAddress(bootstrapperIP),
withBootstrapper(nil), withBootstrapper(nil),
withPubsubConfig(true, pubsubTracer), withPubsubConfig(true, pubsubTracerMaddr),
drandOpt, randomBeaconOpt,
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
n.stop = stop n.StopFn = stop
var bootstrapperAddr ma.Multiaddr var bootstrapperAddr ma.Multiaddr
bootstrapperAddrs, err := n.fullApi.NetAddrsListen(ctx) bootstrapperAddrs, err := n.FullApi.NetAddrsListen(ctx)
if err != nil { if err != nil {
stop(context.TODO()) stop(context.TODO())
return nil, err return nil, err
@ -152,10 +150,18 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
Genesis: genesisBuffer.Bytes(), Genesis: genesisBuffer.Bytes(),
Bootstrapper: bootstrapperAddr.Bytes(), Bootstrapper: bootstrapperAddr.Bytes(),
} }
t.SyncClient.MustPublish(ctx, genesisTopic, genesisMsg) t.SyncClient.MustPublish(ctx, GenesisTopic, genesisMsg)
t.RecordMessage("waiting for all nodes to be ready") t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
return n, nil return &Bootstrapper{n, t}, nil
}
// RunDefault runs a default bootstrapper.
func (b *Bootstrapper) RunDefault() error {
b.t.RecordMessage("running bootstrapper")
ctx := context.Background()
b.t.SyncClient.MustSignalAndWait(ctx, StateDone, b.t.TestInstanceCount)
return nil
} }

View File

@ -0,0 +1,139 @@
package testkit
import (
"context"
"fmt"
"net/http"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/specs-actors/actors/crypto"
)
type LotusClient struct {
*LotusNode
t *TestEnvironment
MinerAddrs []MinerAddressesMsg
}
func PrepareClient(t *TestEnvironment) (*LotusClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
pubsubTracer, err := GetPubsubTracerMaddr(ctx, t)
if err != nil {
return nil, err
}
drandOpt, err := GetRandomBeaconOpts(ctx, t)
if err != nil {
return nil, err
}
// first create a wallet
walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS)
if err != nil {
return nil, err
}
// publish the account ID/balance
balance := t.IntParam("balance")
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
t.SyncClient.Publish(ctx, BalanceTopic, balanceMsg)
// then collect the genesis block and bootstrapper address
genesisMsg, err := WaitForGenesis(t, ctx)
if err != nil {
return nil, err
}
clientIP := t.NetClient.MustGetDataNetworkIP().String()
nodeRepo := repo.NewMemory(nil)
// create the node
n := &LotusNode{}
stop, err := node.New(context.Background(),
node.FullAPI(&n.FullApi),
node.Online(),
node.Repo(nodeRepo),
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
withGenesis(genesisMsg.Genesis),
withListenAddress(clientIP),
withBootstrapper(genesisMsg.Bootstrapper),
withPubsubConfig(false, pubsubTracer),
drandOpt,
)
if err != nil {
return nil, err
}
n.StopFn = stop
// set the wallet
err = n.setWallet(ctx, walletKey)
if err != nil {
_ = stop(context.TODO())
return nil, err
}
err = startClientAPIServer(nodeRepo, n.FullApi)
if err != nil {
return nil, err
}
registerAndExportMetrics(fmt.Sprintf("client_%d", t.GroupSeq))
t.RecordMessage("publish our address to the clients addr topic")
addrinfo, err := n.FullApi.NetAddrsListen(ctx)
if err != nil {
return nil, err
}
t.SyncClient.MustPublish(ctx, ClientsAddrsTopic, addrinfo)
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
// collect miner addresses.
addrs, err := CollectMinerAddrs(t, ctx, t.IntParam("miners"))
if err != nil {
return nil, err
}
t.RecordMessage("got %v miner addrs", len(addrs))
cl := &LotusClient{
t: t,
LotusNode: n,
MinerAddrs: addrs,
}
return cl, nil
}
func (c *LotusClient) RunDefault() error {
// run forever
c.t.RecordMessage("running default client forever")
c.t.WaitUntilAllDone()
return nil
}
func startClientAPIServer(repo *repo.MemRepo, api api.FullNode) error {
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(api))
ah := &auth.Handler{
Verify: api.AuthVerify,
Next: rpcServer.ServeHTTP,
}
http.Handle("/rpc/v0", ah)
srv := &http.Server{Handler: http.DefaultServeMux}
return startServer(repo, srv)
}

View File

@ -1,4 +1,4 @@
package main package testkit
import ( import (
"bytes" "bytes"
@ -105,14 +105,13 @@ func (dr *DrandInstance) ctrl() *dnet.ControlClient {
return cl return cl
} }
func (dr *DrandInstance) RunDKG(nodes, thr int, timeout string, leader bool, leaderAddr string, beaconOffset int) *key.Group { func (dr *DrandInstance) RunDKG(nodes, thr int, period time.Duration, timeout string, leader bool, leaderAddr string, beaconOffset int) *key.Group {
cl := dr.ctrl() cl := dr.ctrl()
p := dr.t.DurationParam("drand_period")
t, _ := time.ParseDuration(timeout) t, _ := time.ParseDuration(timeout)
var grp *drand.GroupPacket var grp *drand.GroupPacket
var err error var err error
if leader { if leader {
grp, err = cl.InitDKGLeader(nodes, thr, p, t, nil, secretDKG, beaconOffset) grp, err = cl.InitDKGLeader(nodes, thr, period, t, nil, secretDKG, beaconOffset)
} else { } else {
leader := dnet.CreatePeer(leaderAddr, false) leader := dnet.CreatePeer(leaderAddr, false)
grp, err = cl.InitDKG(leader, nil, secretDKG) grp, err = cl.InitDKG(leader, nil, secretDKG)
@ -160,46 +159,45 @@ func (dr *DrandInstance) Resume() {
} }
} }
func runDrandNode(t *TestEnvironment) error {
t.RecordMessage("running drand node") func (dr *DrandInstance) RunDefault() error {
dr, err := prepareDrandNode(t) dr.t.RecordMessage("running drand node")
if err != nil {
return err
}
defer dr.Close() defer dr.Close()
ctx := context.Background() if dr.t.IsParamSet("suspend_events") {
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) suspender := statemachine.NewSuspender(dr, dr.t.RecordMessage)
suspender.RunEvents(dr.t.StringParam("suspend_events"))
if t.IsParamSet("suspend_events") {
suspender := statemachine.NewSuspender(dr, t.RecordMessage)
suspender.RunEvents(t.StringParam("suspend_events"))
} }
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) dr.t.WaitUntilAllDone()
return nil return nil
} }
// prepareDrandNode starts a drand instance and runs a DKG with the other members of the composition group. // PrepareDrandInstance starts a drand instance and runs a DKG with the other
// Once the chain is running, the leader publishes the chain info needed by lotus nodes on // members of the composition group.
// drandConfigTopic //
func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) { // Once the chain is running, the leader publishes the chain info needed by
// lotus nodes on DrandConfigTopic.
func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) {
var (
startTime = time.Now()
seq = t.GroupSeq
isLeader = seq == 1
nNodes = t.TestGroupInstanceCount
myAddr = t.NetClient.MustGetDataNetworkIP()
period = t.DurationParam("drand_period")
threshold = t.IntParam("drand_threshold")
runGossipRelay = t.BooleanParam("drand_gossip_relay")
beaconOffset = 3
)
ctx, cancel := context.WithTimeout(context.Background(), PrepareDrandTimeout) ctx, cancel := context.WithTimeout(context.Background(), PrepareDrandTimeout)
defer cancel() defer cancel()
startTime := time.Now()
seq := t.GroupSeq
isLeader := seq == 1
nNodes := t.TestGroupInstanceCount
myAddr := t.NetClient.MustGetDataNetworkIP()
threshold := t.IntParam("drand_threshold")
runGossipRelay := t.BooleanParam("drand_gossip_relay")
beaconOffset := 3
stateDir, err := ioutil.TempDir("/tmp", fmt.Sprintf("drand-%d", t.GroupSeq)) stateDir, err := ioutil.TempDir("/tmp", fmt.Sprintf("drand-%d", t.GroupSeq))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -220,6 +218,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
PublicAddr string PublicAddr string
IsLeader bool IsLeader bool
} }
addrTopic := sync.NewTopic("drand-addrs", &NodeAddr{}) addrTopic := sync.NewTopic("drand-addrs", &NodeAddr{})
var publicAddrs []string var publicAddrs []string
var leaderAddr string var leaderAddr string
@ -229,6 +228,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
PublicAddr: dr.pubAddr, PublicAddr: dr.pubAddr,
IsLeader: isLeader, IsLeader: isLeader,
}, ch) }, ch)
for i := 0; i < nNodes; i++ { for i := 0; i < nNodes; i++ {
select { select {
case msg := <-ch: case msg := <-ch:
@ -240,6 +240,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
return nil, fmt.Errorf("unable to read drand addrs from sync service: %w", err) return nil, fmt.Errorf("unable to read drand addrs from sync service: %w", err)
} }
} }
if leaderAddr == "" { if leaderAddr == "" {
return nil, fmt.Errorf("got %d drand addrs, but no leader", len(publicAddrs)) return nil, fmt.Errorf("got %d drand addrs, but no leader", len(publicAddrs))
} }
@ -270,7 +271,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
if !isLeader { if !isLeader {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
grp := dr.RunDKG(nNodes, threshold, "10s", isLeader, leaderAddr, beaconOffset) grp := dr.RunDKG(nNodes, threshold, period, "10s", isLeader, leaderAddr, beaconOffset)
if grp == nil { if grp == nil {
return nil, fmt.Errorf("drand dkg failed") return nil, fmt.Errorf("drand dkg failed")
} }
@ -348,10 +349,12 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
}, },
GossipBootstrap: relayAddrs, GossipBootstrap: relayAddrs,
} }
t.DebugSpew("publishing drand config on sync topic: %v", cfg) t.DebugSpew("publishing drand config on sync topic: %#v", cfg)
t.SyncClient.MustPublish(ctx, drandConfigTopic, &cfg) t.SyncClient.MustPublish(ctx, DrandConfigTopic, &cfg)
} }
// signal that we're ready to start the test
t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
return &dr, nil return &dr, nil
} }
@ -359,7 +362,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
// you can use the returned dtypes.DrandConfig to override the default production config. // you can use the returned dtypes.DrandConfig to override the default production config.
func waitForDrandConfig(ctx context.Context, client sync.Client) (*DrandRuntimeInfo, error) { func waitForDrandConfig(ctx context.Context, client sync.Client) (*DrandRuntimeInfo, error) {
ch := make(chan *DrandRuntimeInfo, 1) ch := make(chan *DrandRuntimeInfo, 1)
sub := client.MustSubscribe(ctx, drandConfigTopic, ch) sub := client.MustSubscribe(ctx, DrandConfigTopic, ch)
select { select {
case cfg := <-ch: case cfg := <-ch:
return cfg, nil return cfg, nil

View File

@ -1,4 +1,4 @@
package main package testkit
import ( import (
"context" "context"
@ -24,101 +24,33 @@ import (
"github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/gorilla/mux"
libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner" saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/gorilla/mux"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/testground/sdk-go/sync" "github.com/testground/sdk-go/sync"
) )
func runMiner(t *TestEnvironment) error { type LotusMiner struct {
t.RecordMessage("running miner") *LotusNode
miner, err := prepareMiner(t)
if err != nil { t *TestEnvironment
return err
} }
t.RecordMessage("block delay: %v", build.BlockDelay) func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
t.D().Gauge("miner.block-delay").Update(build.BlockDelay)
ctx := context.Background()
clients := t.IntParam("clients")
miners := t.IntParam("miners")
myActorAddr, err := miner.minerApi.ActorAddress(ctx)
if err != nil {
return err
}
// mine / stop mining
mine := true
done := make(chan struct{})
if miner.MineOne != nil {
go func() {
defer t.RecordMessage("shutting down mining")
defer close(done)
var i int
for i = 0; mine; i++ {
// synchronize all miners to mine the next block
t.RecordMessage("synchronizing all miners to mine next block [%d]", i)
stateMineNext := sync.State(fmt.Sprintf("mine-block-%d", i))
t.SyncClient.MustSignalAndWait(ctx, stateMineNext, miners)
ch := make(chan struct{})
err := miner.MineOne(ctx, func(mined bool) {
if mined {
t.D().Counter(fmt.Sprintf("block.mine,miner=%s", myActorAddr)).Inc(1)
}
close(ch)
})
if err != nil {
panic(err)
}
<-ch
}
// signal the last block to make sure no miners are left stuck waiting for the next block signal
// while the others have stopped
stateMineLast := sync.State(fmt.Sprintf("mine-block-%d", i))
t.SyncClient.MustSignalEntry(ctx, stateMineLast)
}()
} else {
close(done)
}
// wait for a signal from all clients to stop mining
err = <-t.SyncClient.MustBarrier(ctx, stateStopMining, clients).C
if err != nil {
return err
}
mine = false
<-done
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
return nil
}
func prepareMiner(t *TestEnvironment) (*Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel() defer cancel()
pubsubTracer, err := getPubsubTracerMaddr(ctx, t) pubsubTracer, err := GetPubsubTracerMaddr(ctx, t)
if err != nil { if err != nil {
return nil, err return nil, err
} }
drandOpt, err := getDrandOpts(ctx, t) drandOpt, err := GetRandomBeaconOpts(ctx, t)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -132,10 +64,10 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
// publish the account ID/balance // publish the account ID/balance
balance := t.IntParam("balance") balance := t.IntParam("balance")
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance} balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
t.SyncClient.Publish(ctx, balanceTopic, balanceMsg) t.SyncClient.Publish(ctx, BalanceTopic, balanceMsg)
// create and publish the preseal commitment // create and publish the preseal commitment
priv, _, err := libp2p_crypto.GenerateEd25519Key(rand.Reader) priv, _, err := libp2pcrypto.GenerateEd25519Key(rand.Reader)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -146,7 +78,7 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
} }
// pick unique sequence number for each miner, no matter in which group they are // pick unique sequence number for each miner, no matter in which group they are
seq := t.SyncClient.MustSignalAndWait(ctx, stateMinerPickSeqNum, t.IntParam("miners")) seq := t.SyncClient.MustSignalAndWait(ctx, StateMinerPickSeqNum, t.IntParam("miners"))
minerAddr, err := address.NewIDAddress(genesis_chain.MinerStart + uint64(seq-1)) minerAddr, err := address.NewIDAddress(genesis_chain.MinerStart + uint64(seq-1))
if err != nil { if err != nil {
@ -168,10 +100,10 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
t.RecordMessage("Miner Info: Owner: %s Worker: %s", genMiner.Owner, genMiner.Worker) t.RecordMessage("Miner Info: Owner: %s Worker: %s", genMiner.Owner, genMiner.Worker)
presealMsg := &PresealMsg{Miner: *genMiner, Seqno: seq} presealMsg := &PresealMsg{Miner: *genMiner, Seqno: seq}
t.SyncClient.Publish(ctx, presealTopic, presealMsg) t.SyncClient.Publish(ctx, PresealTopic, presealMsg)
// then collect the genesis block and bootstrapper address // then collect the genesis block and bootstrapper address
genesisMsg, err := waitForGenesis(t, ctx) genesisMsg, err := WaitForGenesis(t, ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -229,12 +161,12 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
// create the node // create the node
// we need both a full node _and_ and storage miner node // we need both a full node _and_ and storage miner node
n := &Node{} n := &LotusNode{}
nodeRepo := repo.NewMemory(nil) nodeRepo := repo.NewMemory(nil)
stop1, err := node.New(context.Background(), stop1, err := node.New(context.Background(),
node.FullAPI(&n.fullApi), node.FullAPI(&n.FullApi),
node.Online(), node.Online(),
node.Repo(nodeRepo), node.Repo(nodeRepo),
withGenesis(genesisMsg.Genesis), withGenesis(genesisMsg.Genesis),
@ -255,10 +187,10 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
} }
minerOpts := []node.Option{ minerOpts := []node.Option{
node.StorageMiner(&n.minerApi), node.StorageMiner(&n.MinerApi),
node.Online(), node.Online(),
node.Repo(minerRepo), node.Repo(minerRepo),
node.Override(new(api.FullNode), n.fullApi), node.Override(new(api.FullNode), n.FullApi),
withApiEndpoint("/ip4/127.0.0.1/tcp/1234"), withApiEndpoint("/ip4/127.0.0.1/tcp/1234"),
withMinerListenAddress(minerIP), withMinerListenAddress(minerIP),
} }
@ -282,7 +214,7 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
stop1(context.TODO()) stop1(context.TODO())
return nil, err return nil, err
} }
n.stop = func(ctx context.Context) error { n.StopFn = func(ctx context.Context) error {
// TODO use a multierror for this // TODO use a multierror for this
err2 := stop2(ctx) err2 := stop2(ctx)
err1 := stop1(ctx) err1 := stop1(ctx)
@ -295,25 +227,20 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
registerAndExportMetrics(minerAddr.String()) registerAndExportMetrics(minerAddr.String())
// Bootstrap with full node // Bootstrap with full node
remoteAddrs, err := n.fullApi.NetAddrsListen(ctx) remoteAddrs, err := n.FullApi.NetAddrsListen(ctx)
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = n.minerApi.NetConnect(ctx, remoteAddrs) err = n.MinerApi.NetConnect(ctx, remoteAddrs)
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = startStorMinerAPIServer(minerRepo, n.minerApi)
if err != nil {
return nil, err
}
// add local storage for presealed sectors // add local storage for presealed sectors
err = n.minerApi.StorageAddLocal(ctx, presealDir) err = n.MinerApi.StorageAddLocal(ctx, presealDir)
if err != nil { if err != nil {
n.stop(context.TODO()) n.StopFn(context.TODO())
return nil, err return nil, err
} }
@ -333,30 +260,105 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
GasLimit: 1000000, GasLimit: 1000000,
} }
_, err = n.fullApi.MpoolPushMessage(ctx, changeMinerID) _, err = n.FullApi.MpoolPushMessage(ctx, changeMinerID)
if err != nil { if err != nil {
n.stop(context.TODO()) n.StopFn(context.TODO())
return nil, err return nil, err
} }
t.RecordMessage("publish our address to the miners addr topic") t.RecordMessage("publish our address to the miners addr topic")
actoraddress, err := n.minerApi.ActorAddress(ctx) actoraddress, err := n.MinerApi.ActorAddress(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
addrinfo, err := n.minerApi.NetAddrsListen(ctx) addrinfo, err := n.MinerApi.NetAddrsListen(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
t.SyncClient.MustPublish(ctx, minersAddrsTopic, MinerAddressesMsg{addrinfo, actoraddress}) t.SyncClient.MustPublish(ctx, MinersAddrsTopic, MinerAddressesMsg{addrinfo, actoraddress})
t.RecordMessage("waiting for all nodes to be ready") t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
return n, err m := &LotusMiner{n, t}
err = m.startStorageMinerAPIServer(minerRepo, n.MinerApi)
if err != nil {
return nil, err
} }
func startStorMinerAPIServer(repo *repo.MemRepo, minerApi api.StorageMiner) error { return m, err
}
func (m *LotusMiner) RunDefault() error {
var (
t = m.t
clients = t.IntParam("clients")
miners = t.IntParam("miners")
)
t.RecordMessage("running miner")
t.RecordMessage("block delay: %v", build.BlockDelay)
t.D().Gauge("miner.block-delay").Update(build.BlockDelay)
ctx := context.Background()
myActorAddr, err := m.MinerApi.ActorAddress(ctx)
if err != nil {
return err
}
// mine / stop mining
mine := true
done := make(chan struct{})
if m.MineOne != nil {
go func() {
defer t.RecordMessage("shutting down mining")
defer close(done)
var i int
for i = 0; mine; i++ {
// synchronize all miners to mine the next block
t.RecordMessage("synchronizing all miners to mine next block [%d]", i)
stateMineNext := sync.State(fmt.Sprintf("mine-block-%d", i))
t.SyncClient.MustSignalAndWait(ctx, stateMineNext, miners)
ch := make(chan struct{})
err := m.MineOne(ctx, func(mined bool) {
if mined {
t.D().Counter(fmt.Sprintf("block.mine,miner=%s", myActorAddr)).Inc(1)
}
close(ch)
})
if err != nil {
panic(err)
}
<-ch
}
// signal the last block to make sure no miners are left stuck waiting for the next block signal
// while the others have stopped
stateMineLast := sync.State(fmt.Sprintf("mine-block-%d", i))
t.SyncClient.MustSignalEntry(ctx, stateMineLast)
}()
} else {
close(done)
}
// wait for a signal from all clients to stop mining
err = <-t.SyncClient.MustBarrier(ctx, StateStopMining, clients).C
if err != nil {
return err
}
mine = false
<-done
t.SyncClient.MustSignalAndWait(ctx, StateDone, t.TestInstanceCount)
return nil
}
func (m *LotusMiner) startStorageMinerAPIServer(repo *repo.MemRepo, minerApi api.StorageMiner) error {
mux := mux.NewRouter() mux := mux.NewRouter()
rpcServer := jsonrpc.NewServer() rpcServer := jsonrpc.NewServer()

View File

@ -1,4 +1,4 @@
package main package testkit
import ( import (
"context" "context"
@ -14,16 +14,12 @@ import (
) )
type PubsubTracer struct { type PubsubTracer struct {
t *TestEnvironment
host host.Host host host.Host
traced *traced.TraceCollector traced *traced.TraceCollector
} }
func (tr *PubsubTracer) Stop() error { func PreparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) {
tr.traced.Stop()
return tr.host.Close()
}
func preparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) {
ctx := context.Background() ctx := context.Background()
privk, _, err := crypto.GenerateEd25519Key(rand.Reader) privk, _, err := crypto.GenerateEd25519Key(rand.Reader)
@ -54,29 +50,30 @@ func preparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) {
_ = ma.StringCast(tracedMultiaddrStr) _ = ma.StringCast(tracedMultiaddrStr)
tracedMsg := &PubsubTracerMsg{Multiaddr: tracedMultiaddrStr} tracedMsg := &PubsubTracerMsg{Multiaddr: tracedMultiaddrStr}
t.SyncClient.MustPublish(ctx, pubsubTracerTopic, tracedMsg) t.SyncClient.MustPublish(ctx, PubsubTracerTopic, tracedMsg)
t.RecordMessage("waiting for all nodes to be ready") t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
return &PubsubTracer{host: host, traced: traced}, nil tracer := &PubsubTracer{t: t, host: host, traced: traced}
return tracer, nil
} }
func runPubsubTracer(t *TestEnvironment) error { func (tr *PubsubTracer) RunDefault() error {
t.RecordMessage("running pubsub tracer") tr.t.RecordMessage("running pubsub tracer")
tracer, err := preparePubsubTracer(t)
if err != nil {
return err
}
defer func() { defer func() {
err := tracer.Stop() err := tr.Stop()
if err != nil { if err != nil {
t.RecordMessage("error stoping tracer: %s", err) tr.t.RecordMessage("error stoping tracer: %s", err)
} }
}() }()
ctx := context.Background() tr.t.WaitUntilAllDone()
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
return nil return nil
} }
func (tr *PubsubTracer) Stop() error {
tr.traced.Stop()
return tr.host.Close()
}

View File

@ -1,4 +1,4 @@
package main package testkit
import ( import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -9,20 +9,20 @@ import (
) )
var ( var (
genesisTopic = sync.NewTopic("genesis", &GenesisMsg{}) GenesisTopic = sync.NewTopic("genesis", &GenesisMsg{})
balanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{}) BalanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{})
presealTopic = sync.NewTopic("preseal", &PresealMsg{}) PresealTopic = sync.NewTopic("preseal", &PresealMsg{})
clientsAddrsTopic = sync.NewTopic("clientsAddrsTopic", &peer.AddrInfo{}) ClientsAddrsTopic = sync.NewTopic("clients_addrs", &peer.AddrInfo{})
minersAddrsTopic = sync.NewTopic("minersAddrsTopic", &MinerAddressesMsg{}) MinersAddrsTopic = sync.NewTopic("miners_addrs", &MinerAddressesMsg{})
pubsubTracerTopic = sync.NewTopic("pubsubTracer", &PubsubTracerMsg{}) PubsubTracerTopic = sync.NewTopic("pubsub_tracer", &PubsubTracerMsg{})
drandConfigTopic = sync.NewTopic("drand-config", &DrandRuntimeInfo{}) DrandConfigTopic = sync.NewTopic("drand_config", &DrandRuntimeInfo{})
) )
var ( var (
stateReady = sync.State("ready") StateReady = sync.State("ready")
stateDone = sync.State("done") StateDone = sync.State("done")
stateStopMining = sync.State("stop-mining") StateStopMining = sync.State("stop-mining")
stateMinerPickSeqNum = sync.State("miner-pick-seq-num") StateMinerPickSeqNum = sync.State("miner-pick-seq-num")
) )
type InitialBalanceMsg struct { type InitialBalanceMsg struct {

View File

@ -0,0 +1,53 @@
package testkit
import (
"context"
"fmt"
"strings"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
)
type TestEnvironment struct {
*runtime.RunEnv
*run.InitContext
Role string
}
// workaround for default params being wrapped in quote chars
func (t *TestEnvironment) StringParam(name string) string {
return strings.Trim(t.RunEnv.StringParam(name), "\"")
}
func (t *TestEnvironment) DurationParam(name string) time.Duration {
d, err := time.ParseDuration(t.StringParam(name))
if err != nil {
panic(fmt.Errorf("invalid duration value for param '%s': %w", name, err))
}
return d
}
func (t *TestEnvironment) DebugSpew(format string, args... interface{}) {
t.RecordMessage(spew.Sprintf(format, args...))
}
// WaitUntilAllDone waits until all instances in the test case are done.
func (t *TestEnvironment) WaitUntilAllDone() {
ctx := context.Background()
t.SyncClient.MustSignalAndWait(ctx, StateDone, t.TestInstanceCount)
}
// WrapTestEnvironment takes a test case function that accepts a
// *TestEnvironment, and adapts it to the original unwrapped SDK style
// (run.InitializedTestCaseFn).
func WrapTestEnvironment(f func(t *TestEnvironment) error) run.InitializedTestCaseFn {
return func(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
t := &TestEnvironment{RunEnv: runenv, InitContext: initCtx}
t.Role = t.StringParam("role")
return f(t)
}
}