iterate on testkit library.

This commit is contained in:
Raúl Kripalani 2020-07-01 14:41:38 +01:00
parent 436fe158c9
commit 81f8451a23
11 changed files with 232 additions and 265 deletions

View File

@ -15,7 +15,7 @@ import (
)
var cases = map[string]interface{}{
"deals-e2e": dealsE2E(),
"deals-e2e": testkit.WrapTestEnvironment(dealsE2E),
}
func main() {
@ -40,76 +40,80 @@ func main() {
// sectors from each node.
// Then we create a genesis block that allocates some funds to each node and collects
// the presealed sectors.
func dealsE2E() run.InitializedTestCaseFn {
client := func(t *testkit.TestEnvironment, cl *testkit.LotusClient) error {
t.RecordMessage("running client")
ctx := context.Background()
client := cl.FullApi
// select a random miner
minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))]
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
return err
}
t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1)
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
time.Sleep(2 * time.Second)
// generate 1600 bytes of random data
data := make([]byte, 1600)
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)
file, err := ioutil.TempFile("/tmp", "data")
if err != nil {
return err
}
defer os.Remove(file.Name())
_, err = file.Write(data)
if err != nil {
return err
}
fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false})
if err != nil {
return err
}
t.RecordMessage("file cid: %s", fcid)
// start deal
t1 := time.Now()
deal := testkit.StartDeal(ctx, minerAddr.ActorAddr, client, fcid)
t.RecordMessage("started deal: %s", deal)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(2 * time.Second)
t.RecordMessage("waiting for deal to be sealed")
testkit.WaitDealSealed(t, ctx, client, deal)
t.D().ResettingHistogram("deal.sealed").Update(int64(time.Since(t1)))
carExport := true
t.RecordMessage("trying to retrieve %s", fcid)
testkit.RetrieveData(t, ctx, err, client, fcid, carExport, data)
t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1)))
t.SyncClient.MustSignalEntry(ctx, testkit.StateStopMining)
time.Sleep(10 * time.Second) // wait for metrics to be emitted
// TODO broadcast published content CIDs to other clients
// TODO select a random piece of content published by some other client and retrieve it
t.SyncClient.MustSignalAndWait(ctx, testkit.StateDone, t.TestInstanceCount)
return nil
func dealsE2E(t *testkit.TestEnvironment) error {
// Dispatch non-client roles to defaults.
if t.Role != "client" {
return testkit.RunDefaultRole(t)
}
testkit.MustOverrideRoleRun(testkit.RoleClient, client)
cl, err := testkit.PrepareClient(t)
if err != nil {
return err
}
return testkit.Entrypoint()
// This is a client role
t.RecordMessage("running client")
ctx := context.Background()
client := cl.FullApi
// select a random miner
minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))]
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
return err
}
t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1)
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
time.Sleep(2 * time.Second)
// generate 1600 bytes of random data
data := make([]byte, 1600)
rand.New(rand.NewSource(time.Now().UnixNano())).Read(data)
file, err := ioutil.TempFile("/tmp", "data")
if err != nil {
return err
}
defer os.Remove(file.Name())
_, err = file.Write(data)
if err != nil {
return err
}
fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false})
if err != nil {
return err
}
t.RecordMessage("file cid: %s", fcid)
// start deal
t1 := time.Now()
deal := testkit.StartDeal(ctx, minerAddr.ActorAddr, client, fcid)
t.RecordMessage("started deal: %s", deal)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(2 * time.Second)
t.RecordMessage("waiting for deal to be sealed")
testkit.WaitDealSealed(t, ctx, client, deal)
t.D().ResettingHistogram("deal.sealed").Update(int64(time.Since(t1)))
carExport := true
t.RecordMessage("trying to retrieve %s", fcid)
testkit.RetrieveData(t, ctx, err, client, fcid, carExport, data)
t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1)))
t.SyncClient.MustSignalEntry(ctx, testkit.StateStopMining)
time.Sleep(10 * time.Second) // wait for metrics to be emitted
// TODO broadcast published content CIDs to other clients
// TODO select a random piece of content published by some other client and retrieve it
t.SyncClient.MustSignalAndWait(ctx, testkit.StateDone, t.TestInstanceCount)
return nil
}

View File

@ -17,15 +17,15 @@ enabled = true
enabled = true
[[testcases]]
name = "lotus-baseline"
name = "deals-e2e"
instances = { min = 1, max = 100, default = 5 }
[testcases.params]
clients = { type = "int", default = 1 }
miners = { type = "int", default = 1 }
balance = { type = "int", default = 1 }
sectors = { type = "int", default = 1 }
role = { type = "string" }
clients = { type = "int", default = 1 }
miners = { type = "int", default = 1 }
balance = { type = "int", default = 1 }
sectors = { type = "int", default = 1 }
role = { type = "string" }
genesis_timestamp_offset = { type = "int", default = 0 }
@ -35,9 +35,9 @@ instances = { min = 1, max = 100, default = 5 }
# in the same composition group. There must be at least threshold drand nodes.
# To get lotus nodes to actually use the drand nodes, you must set random_beacon_type="local-drand"
# for the lotus node groups.
drand_period = { type = "duration", default="10s" }
drand_threshold = { type = "int", default = 2 }
drand_gossip_relay = { type = "bool", default = true }
drand_period = { type = "duration", default="10s" }
drand_threshold = { type = "int", default = 2 }
drand_gossip_relay = { type = "bool", default = true }
# Params relevant to pubsub tracing
enable_pubsub_tracer = { type = "bool", default = false }

View File

@ -0,0 +1,53 @@
package testkit
import "fmt"
type RoleName = string
var DefaultRoles = map[RoleName]func(*TestEnvironment) error{
"bootstrapper": func(t *TestEnvironment) error {
b, err := PrepareBootstrapper(t)
if err != nil {
return err
}
return b.RunDefault()
},
"miner": func(t *TestEnvironment) error {
m, err := PrepareMiner(t)
if err != nil {
return err
}
return m.RunDefault()
},
"client": func(t *TestEnvironment) error {
c, err := PrepareClient(t)
if err != nil {
return err
}
return c.RunDefault()
},
"drand": func(t *TestEnvironment) error {
d, err := PrepareDrandInstance(t)
if err != nil {
return err
}
return d.RunDefault()
},
"pubsub-tracer": func(t *TestEnvironment) error {
tr, err := PreparePubsubTracer(t)
if err != nil {
return err
}
return tr.RunDefault()
},
}
// RunDefaultRole runs a default role, extracted from the `role` RunEnv
// parameter.
func RunDefaultRole(t *TestEnvironment) error {
f, ok := DefaultRoles[t.Role]
if !ok {
panic(fmt.Sprintf("unrecognized role: %s", t.Role))
}
return f(t)
}

View File

@ -1,13 +0,0 @@
package testkit
import (
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
)
func Entrypoint() run.InitializedTestCaseFn {
return func(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
role := runenv.StringParam("role")
return ExecuteRole(Role(role), runenv, initCtx)
}
}

View File

@ -19,19 +19,15 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
func runBootstrapper(t *TestEnvironment, node *LotusNode) error {
t.RecordMessage("running bootstrapper")
_, err := prepareBootstrapper(t)
if err != nil {
return err
}
// Bootstrapper is a special kind of process that produces a genesis block with
// the initial wallet balances and preseals for all enlisted miners and clients.
type Bootstrapper struct {
*LotusNode
ctx := context.Background()
t.SyncClient.MustSignalAndWait(ctx, StateDone, t.TestInstanceCount)
return nil
t *TestEnvironment
}
func prepareBootstrapper(t *TestEnvironment) (*LotusNode, error) {
func PrepareBootstrapper(t *TestEnvironment) (*Bootstrapper, error) {
var (
clients = t.IntParam("clients")
miners = t.IntParam("miners")
@ -159,5 +155,13 @@ func prepareBootstrapper(t *TestEnvironment) (*LotusNode, error) {
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
return n, nil
return &Bootstrapper{n, t}, nil
}
// RunDefault runs a default bootstrapper.
func (b *Bootstrapper) RunDefault() error {
b.t.RecordMessage("running bootstrapper")
ctx := context.Background()
b.t.SyncClient.MustSignalAndWait(ctx, StateDone, b.t.TestInstanceCount)
return nil
}

View File

@ -19,10 +19,11 @@ import (
type LotusClient struct {
*LotusNode
t *TestEnvironment
MinerAddrs []MinerAddressesMsg
}
func prepareClient(t *TestEnvironment) (*LotusClient, error) {
func PrepareClient(t *TestEnvironment) (*LotusClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
@ -107,16 +108,17 @@ func prepareClient(t *TestEnvironment) (*LotusClient, error) {
t.RecordMessage("got %v miner addrs", len(addrs))
cl := &LotusClient{
t: t,
LotusNode: n,
MinerAddrs: addrs,
}
return cl, nil
}
func runDefaultClient(t *TestEnvironment, _ *LotusClient) error {
func (c *LotusClient) RunDefault() error {
// run forever
t.RecordMessage("running default client forever")
t.WaitUntilAllDone()
c.t.RecordMessage("running default client forever")
c.t.WaitUntilAllDone()
return nil
}

View File

@ -26,6 +26,8 @@ import (
var PrepareDrandTimeout = time.Minute
type DrandInstance struct {
t *TestEnvironment
Node node.Node
GossipRelay *lp2p.GossipRelayNode
@ -36,35 +38,38 @@ func (d *DrandInstance) Cleanup() error {
return os.RemoveAll(d.stateDir)
}
func runDrandNode(t *TestEnvironment, dr *DrandInstance) error {
t.RecordMessage("running drand node")
defer dr.Cleanup()
func (d *DrandInstance) RunDefault() error {
d.t.RecordMessage("running drand node")
defer d.Cleanup()
// TODO add ability to halt / recover on demand
t.WaitUntilAllDone()
d.t.WaitUntilAllDone()
return nil
}
// prepareDrandNode starts a drand instance and runs a DKG with the other members of the composition group.
// Once the chain is running, the leader publishes the chain info needed by lotus nodes on
// drandConfigTopic
func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
// PrepareDrandInstance starts a drand instance and runs a DKG with the other
// members of the composition group.
//
// Once the chain is running, the leader publishes the chain info needed by
// lotus nodes on DrandConfigTopic.
func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) {
var (
startTime = time.Now()
seq = t.GroupSeq
isLeader = seq == 1
nNodes = t.TestGroupInstanceCount
myAddr = t.NetClient.MustGetDataNetworkIP()
period = t.DurationParam("drand_period")
threshold = t.IntParam("drand_threshold")
runGossipRelay = t.BooleanParam("drand_gossip_relay")
beaconOffset = 3
)
ctx, cancel := context.WithTimeout(context.Background(), PrepareDrandTimeout)
defer cancel()
startTime := time.Now()
seq := t.GroupSeq
isLeader := seq == 1
nNodes := t.TestGroupInstanceCount
myAddr := t.NetClient.MustGetDataNetworkIP()
period := t.DurationParam("drand_period")
threshold := t.IntParam("drand_threshold")
runGossipRelay := t.BooleanParam("drand_gossip_relay")
beaconOffset := 3
stateDir, err := ioutil.TempDir("", fmt.Sprintf("drand-%d", t.GroupSeq))
if err != nil {
return nil, err
@ -80,6 +85,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
PublicAddr string
IsLeader bool
}
addrTopic := sync.NewTopic("drand-addrs", &NodeAddr{})
var publicAddrs []string
var leaderAddr string
@ -89,6 +95,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
PublicAddr: n.PublicAddr(),
IsLeader: isLeader,
}, ch)
for i := 0; i < nNodes; i++ {
select {
case msg := <-ch:
@ -100,6 +107,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
return nil, fmt.Errorf("unable to read drand addrs from sync service: %w", err)
}
}
if leaderAddr == "" {
return nil, fmt.Errorf("got %d drand addrs, but no leader", len(publicAddrs))
}
@ -215,6 +223,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
}
return &DrandInstance{
t: t,
Node: n,
GossipRelay: gossipRelay,
stateDir: stateDir,

View File

@ -37,9 +37,11 @@ import (
type LotusMiner struct {
*LotusNode
t *TestEnvironment
}
func prepareMiner(t *TestEnvironment) (*LotusMiner, error) {
func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
@ -235,11 +237,6 @@ func prepareMiner(t *TestEnvironment) (*LotusMiner, error) {
panic(err)
}
err = startStorageMinerAPIServer(minerRepo, n.MinerApi)
if err != nil {
return nil, err
}
// add local storage for presealed sectors
err = n.MinerApi.StorageAddLocal(ctx, presealDir)
if err != nil {
@ -283,11 +280,19 @@ func prepareMiner(t *TestEnvironment) (*LotusMiner, error) {
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
return &LotusMiner{n}, err
m := &LotusMiner{n, t}
err = m.startStorageMinerAPIServer(minerRepo, n.MinerApi)
if err != nil {
return nil, err
}
return m, err
}
func runDefaultMiner(t *TestEnvironment, miner *LotusMiner) error {
func (m *LotusMiner) RunDefault() error {
var (
t = m.t
clients = t.IntParam("clients")
miners = t.IntParam("miners")
)
@ -297,7 +302,7 @@ func runDefaultMiner(t *TestEnvironment, miner *LotusMiner) error {
t.D().Gauge("miner.block-delay").Update(build.BlockDelay)
ctx := context.Background()
myActorAddr, err := miner.MinerApi.ActorAddress(ctx)
myActorAddr, err := m.MinerApi.ActorAddress(ctx)
if err != nil {
return err
}
@ -306,7 +311,7 @@ func runDefaultMiner(t *TestEnvironment, miner *LotusMiner) error {
mine := true
done := make(chan struct{})
if miner.MineOne != nil {
if m.MineOne != nil {
go func() {
defer t.RecordMessage("shutting down mining")
defer close(done)
@ -319,7 +324,7 @@ func runDefaultMiner(t *TestEnvironment, miner *LotusMiner) error {
t.SyncClient.MustSignalAndWait(ctx, stateMineNext, miners)
ch := make(chan struct{})
err := miner.MineOne(ctx, func(mined bool) {
err := m.MineOne(ctx, func(mined bool) {
if mined {
t.D().Counter(fmt.Sprintf("block.mine,miner=%s", myActorAddr)).Inc(1)
}
@ -353,7 +358,7 @@ func runDefaultMiner(t *TestEnvironment, miner *LotusMiner) error {
return nil
}
func startStorageMinerAPIServer(repo *repo.MemRepo, minerApi api.StorageMiner) error {
func (m *LotusMiner) startStorageMinerAPIServer(repo *repo.MemRepo, minerApi api.StorageMiner) error {
mux := mux.NewRouter()
rpcServer := jsonrpc.NewServer()

View File

@ -14,16 +14,12 @@ import (
)
type PubsubTracer struct {
t *TestEnvironment
host host.Host
traced *traced.TraceCollector
}
func (tr *PubsubTracer) Stop() error {
tr.traced.Stop()
return tr.host.Close()
}
func preparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) {
func PreparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) {
ctx := context.Background()
privk, _, err := crypto.GenerateEd25519Key(rand.Reader)
@ -59,19 +55,25 @@ func preparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) {
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
return &PubsubTracer{host: host, traced: traced}, nil
tracer := &PubsubTracer{t: t, host: host, traced: traced}
return tracer, nil
}
func runPubsubTracer(t *TestEnvironment, tracer *PubsubTracer) error {
t.RecordMessage("running pubsub tracer")
func (tr *PubsubTracer) RunDefault() error {
tr.t.RecordMessage("running pubsub tracer")
defer func() {
err := tracer.Stop()
err := tr.Stop()
if err != nil {
t.RecordMessage("error stoping tracer: %s", err)
tr.t.RecordMessage("error stoping tracer: %s", err)
}
}()
t.WaitUntilAllDone()
tr.t.WaitUntilAllDone()
return nil
}
func (tr *PubsubTracer) Stop() error {
tr.traced.Stop()
return tr.host.Close()
}

View File

@ -1,113 +0,0 @@
package testkit
import (
"fmt"
"reflect"
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
)
type Role string
const (
RoleBootstrapper = Role("bootstrapper")
RoleMiner = Role("miner")
RoleClient = Role("client")
RoleDrand = Role("drand")
RolePubsubTracer = Role("pubsub-tracer")
)
type RoleConfig struct {
Role Role
PrepareFunc interface{}
RunFunc interface{}
nodeType reflect.Type
}
func (rc *RoleConfig) validate() error {
var (
ptyp = reflect.TypeOf(rc.PrepareFunc)
rtyp = reflect.TypeOf(rc.RunFunc)
)
// validate signature of prepare function.
// it's a function
ptypvalid := ptyp.Kind() == reflect.Func
// validate in args
ptypvalid = ptypvalid && ptyp.NumIn() == 1 && ptyp.In(0) == reflect.TypeOf((*TestEnvironment)(nil))
// validate out args
ptypvalid = ptypvalid && ptyp.NumOut() == 2 && ptyp.Out(1) == reflect.TypeOf((error)(nil))
if !ptypvalid {
return fmt.Errorf("signature of prepare function is invalid")
}
// retain node type
rc.nodeType = ptyp.Out(0)
// validate signature of run function.
rtypvalid := rtyp.Kind() == reflect.Func
// validate in args
rtypvalid = rtypvalid && rtyp.NumIn() == 2 && rtyp.In(0) == reflect.TypeOf((*TestEnvironment)(nil)) && rtyp.In(1) == rc.nodeType
// validate out args
rtypvalid = rtypvalid && rtyp.NumOut() == 1 && rtyp.Out(0) == reflect.TypeOf((error)(nil))
if !rtypvalid {
return fmt.Errorf("signature of run function is invalid")
}
return nil
}
var basicRoles = make(map[Role]*RoleConfig)
func MustRegisterRole(role Role, prepareFunc interface{}, runFunc interface{}) {
if _, ok := basicRoles[role]; ok {
panic(fmt.Errorf("duplicate role registration: %s", role))
}
rc := &RoleConfig{Role: role, PrepareFunc: prepareFunc, RunFunc: runFunc}
if err := rc.validate(); err != nil {
panic(fmt.Errorf("failed to validate role config: %w", err))
}
basicRoles[role] = rc
}
func MustOverrideRoleRun(role Role, runFunc interface{}) {
rc, ok := basicRoles[role]
if !ok {
panic(fmt.Errorf("role not registered: %s", role))
}
newrc := &RoleConfig{Role: role, PrepareFunc: rc.PrepareFunc, RunFunc: runFunc}
if err := newrc.validate(); err != nil {
panic(fmt.Errorf("failed to validate role config: %w", err))
}
basicRoles[role] = newrc
}
func ExecuteRole(role Role, runenv *runtime.RunEnv, initCtx *run.InitContext) error {
rc, ok := basicRoles[role]
if !ok {
panic(fmt.Errorf("role not registered: %s", role))
}
t := &TestEnvironment{runenv, initCtx}
prepareFn := reflect.ValueOf(rc.PrepareFunc)
runFn := reflect.ValueOf(rc.RunFunc)
out := prepareFn.Call([]reflect.Value{reflect.ValueOf(t)})
if err := out[1].Interface().(error); err != nil {
return err
}
out = runFn.Call([]reflect.Value{reflect.ValueOf(t), out[0]})
return out[0].Interface().(error)
}
func init() {
MustRegisterRole(RoleBootstrapper, prepareBootstrapper, runBootstrapper)
MustRegisterRole(RoleMiner, prepareMiner, runDefaultMiner)
MustRegisterRole(RoleClient, prepareClient, runDefaultClient)
MustRegisterRole(RoleDrand, prepareDrandNode, runDrandNode)
MustRegisterRole(RolePubsubTracer, preparePubsubTracer, runPubsubTracer)
}

View File

@ -13,6 +13,8 @@ import (
type TestEnvironment struct {
*runtime.RunEnv
*run.InitContext
Role string
}
// workaround for default params being wrapped in quote chars
@ -28,7 +30,19 @@ func (t *TestEnvironment) DurationParam(name string) time.Duration {
return d
}
// WaitUntilAllDone waits until all instances in the test case are done.
func (t *TestEnvironment) WaitUntilAllDone() {
ctx := context.Background()
t.SyncClient.MustSignalAndWait(ctx, StateDone, t.TestInstanceCount)
}
// WrapTestEnvironment takes a test case function that accepts a
// *TestEnvironment, and adapts it to the original unwrapped SDK style
// (run.InitializedTestCaseFn).
func WrapTestEnvironment(f func(t *TestEnvironment) error) run.InitializedTestCaseFn {
return func(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
t := &TestEnvironment{RunEnv: runenv, InitContext: initCtx}
t.Role = t.StringParam("role")
return f(t)
}
}