randomized network params & batch runner (#142)

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
This commit is contained in:
Yusef Napora 2020-07-27 09:58:27 -04:00 committed by GitHub
parent 7af5ab5445
commit 0d0fc2d433
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 386 additions and 1 deletions

View File

@ -0,0 +1,57 @@
[metadata]
name = "lotus-soup"
author = ""
[global]
plan = "lotus-soup"
case = "deals-e2e"
total_instances = 7
builder = "docker:go"
runner = "local:docker"
[global.build]
selectors = ["testground"]
[global.run_config]
exposed_ports = { pprof = "6060", node_rpc = "1234", miner_rpc = "2345" }
[global.build_config]
enable_go_build_cache = true
[global.run.test_params]
clients = "5"
miners = "1"
genesis_timestamp_offset = "0"
balance = "20000000" # These balances will work for maximum 100 nodes, as TotalFilecoin is 2B
sectors = "5"
random_beacon_type = "mock"
mining_mode = "natural"
[[groups]]
id = "bootstrapper"
[groups.instances]
count = 1
percentage = 0.0
[groups.run]
[groups.run.test_params]
role = "bootstrapper"
[[groups]]
id = "miners"
[groups.instances]
count = 1
percentage = 0.0
[groups.run]
[groups.run.test_params]
role = "miner"
latency_range = '["20ms", "300ms"]'
[[groups]]
id = "clients"
[groups.instances]
count = 5
percentage = 0.0
[groups.run]
[groups.run.test_params]
role = "client"
latency_range = '["100ms", "1500ms"]'

View File

@ -4,6 +4,7 @@ go 1.14
require (
contrib.go.opencensus.io/exporter/prometheus v0.1.0
github.com/codeskyblue/go-sh v0.0.0-20200712050446-30169cf553fe
github.com/davecgh/go-spew v1.1.1
github.com/drand/drand v1.0.3-0.20200714175734-29705eaf09d4
github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef

View File

@ -137,6 +137,10 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0 h1:sDMmm+q/3+BukdIpxwO365v/Rbspp2Nt5XntgQRXq8Q=
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM=
github.com/codeskyblue/go-sh v0.0.0-20200712050446-30169cf553fe h1:69JI97HlzP+PH5Mi1thcGlDoBr6PS2Oe+l3mNmAkbs4=
github.com/codeskyblue/go-sh v0.0.0-20200712050446-30169cf553fe/go.mod h1:VQx0hjo2oUeQkQUET7wRwradO6f+fN5jzXgB/zROxxE=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=

120
lotus-soup/runner/main.go Normal file
View File

@ -0,0 +1,120 @@
package main
import (
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path"
"github.com/codeskyblue/go-sh"
)
type jobDefinition struct {
runNumber int
compositionPath string
outputDir string
skipStdout bool
}
type jobResult struct {
job jobDefinition
runError error
}
func runComposition(job jobDefinition) jobResult {
outputArchive := path.Join(job.outputDir, "test-outputs.tgz")
cmd := sh.Command("testground", "run", "composition", "-f", job.compositionPath, "--collect", "-o", outputArchive)
if err := os.MkdirAll(job.outputDir, os.ModePerm); err != nil {
return jobResult{runError: fmt.Errorf("unable to make output directory: %w", err)}
}
outPath := path.Join(job.outputDir, "run.out")
outFile, err := os.Create(outPath)
if err != nil {
return jobResult{runError: fmt.Errorf("unable to create output file %s: %w", outPath, err)}
}
if job.skipStdout {
cmd.Stdout = outFile
} else {
cmd.Stdout = io.MultiWriter(os.Stdout, outFile)
}
log.Printf("starting test run %d. writing testground client output to %s\n", job.runNumber, outPath)
if err = cmd.Run(); err != nil {
return jobResult{job: job, runError: err}
}
return jobResult{job: job}
}
func worker(id int, jobs <-chan jobDefinition, results chan<- jobResult) {
log.Printf("started worker %d\n", id)
for j := range jobs {
log.Printf("worker %d started test run %d\n", id, j.runNumber)
results <- runComposition(j)
}
}
func buildComposition(compositionPath string, outputDir string) (string, error) {
outComp := path.Join(outputDir, "composition.toml")
err := sh.Command("cp", compositionPath, outComp).Run()
if err != nil {
return "", err
}
return outComp, sh.Command("testground", "build", "composition", "-w", "-f", outComp).Run()
}
func main() {
runs := flag.Int("runs", 1, "number of times to run composition")
parallelism := flag.Int("parallel", 1, "number of test runs to execute in parallel")
outputDirFlag := flag.String("output", "", "path to output directory (will use temp dir if unset)")
flag.Parse()
if len(flag.Args()) != 1 {
log.Fatal("must provide a single composition file path argument")
}
outdir := *outputDirFlag
if outdir == "" {
var err error
outdir, err = ioutil.TempDir(os.TempDir(), "oni-batch-run-")
if err != nil {
log.Fatal(err)
}
}
if err := os.MkdirAll(outdir, os.ModePerm); err != nil {
log.Fatal(err)
}
compositionPath := flag.Args()[0]
// first build the composition and write out the artifacts.
// we copy to a temp file first to avoid modifying the original
log.Printf("building composition %s\n", compositionPath)
compositionPath, err := buildComposition(compositionPath, outdir)
if err != nil {
log.Fatal(err)
}
jobs := make(chan jobDefinition, *runs)
results := make(chan jobResult, *runs)
for w := 1; w <= *parallelism; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= *runs; j++ {
dir := path.Join(outdir, fmt.Sprintf("run-%d", j))
skipStdout := *parallelism != 1
jobs <- jobDefinition{runNumber: j, compositionPath: compositionPath, outputDir: dir, skipStdout: skipStdout}
}
close(jobs)
for i := 0; i < *runs; i++ {
r := <-results
if r.runError != nil {
log.Printf("error running job %d: %s\n", r.job.runNumber, r.runError)
}
}
}

86
lotus-soup/testkit/net.go Normal file
View File

@ -0,0 +1,86 @@
package testkit
import (
"context"
"fmt"
"time"
"github.com/testground/sdk-go/network"
"github.com/testground/sdk-go/sync"
)
func ApplyNetworkParameters(t *TestEnvironment) {
if !t.TestSidecar {
t.RecordMessage("no test sidecar, skipping network config")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ls := network.LinkShape{}
if t.IsParamSet("latency_range") {
r := t.DurationRangeParam("latency_range")
ls.Latency = r.ChooseRandom()
t.D().RecordPoint("latency_ms", float64(ls.Latency.Milliseconds()))
}
if t.IsParamSet("jitter_range") {
r := t.DurationRangeParam("jitter_range")
ls.Jitter = r.ChooseRandom()
t.D().RecordPoint("jitter_ms", float64(ls.Jitter.Milliseconds()))
}
if t.IsParamSet("loss_range") {
r := t.FloatRangeParam("loss_range")
ls.Loss = r.ChooseRandom()
t.D().RecordPoint("packet_loss", float64(ls.Loss))
}
if t.IsParamSet("corrupt_range") {
r := t.FloatRangeParam("corrupt_range")
ls.Corrupt = r.ChooseRandom()
t.D().RecordPoint("corrupt_packet_probability", float64(ls.Corrupt))
}
if t.IsParamSet("corrupt_corr_range") {
r := t.FloatRangeParam("corrupt_corr_range")
ls.CorruptCorr = r.ChooseRandom()
t.D().RecordPoint("corrupt_packet_correlation", float64(ls.CorruptCorr))
}
if t.IsParamSet("reorder_range") {
r := t.FloatRangeParam("reorder_range")
ls.Reorder = r.ChooseRandom()
t.D().RecordPoint("reordered_packet_probability", float64(ls.Reorder))
}
if t.IsParamSet("reorder_corr_range") {
r := t.FloatRangeParam("reorder_corr_range")
ls.ReorderCorr = r.ChooseRandom()
t.D().RecordPoint("reordered_packet_correlation", float64(ls.ReorderCorr))
}
if t.IsParamSet("duplicate_range") {
r := t.FloatRangeParam("duplicate_range")
ls.Duplicate = r.ChooseRandom()
t.D().RecordPoint("duplicate_packet_probability", float64(ls.Duplicate))
}
if t.IsParamSet("duplicate_corr_range") {
r := t.FloatRangeParam("duplicate_corr_range")
ls.DuplicateCorr = r.ChooseRandom()
t.D().RecordPoint("duplicate_packet_correlation", float64(ls.DuplicateCorr))
}
t.NetClient.MustConfigureNetwork(ctx, &network.Config{
Network: "default",
Enable: true,
Default: ls,
CallbackState: sync.State(fmt.Sprintf("latency-configured-%s", t.TestGroupID)),
CallbackTarget: t.TestGroupInstanceCount,
})
t.DumpJSON("network-link-shape.json", ls)
}

View File

@ -30,6 +30,8 @@ func PrepareClient(t *TestEnvironment) (*LotusClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
ApplyNetworkParameters(t)
pubsubTracer, err := GetPubsubTracerMaddr(ctx, t)
if err != nil {
return nil, err

View File

@ -179,6 +179,8 @@ func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareDrandTimeout)
defer cancel()
ApplyNetworkParameters(t)
startTime := time.Now()
seq := t.GroupSeq

View File

@ -61,6 +61,8 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
ApplyNetworkParameters(t)
pubsubTracer, err := GetPubsubTracerMaddr(ctx, t)
if err != nil {
return nil, err

View File

@ -2,12 +2,12 @@ package testkit
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
)
@ -32,10 +32,41 @@ func (t *TestEnvironment) DurationParam(name string) time.Duration {
return d
}
func (t *TestEnvironment) DurationRangeParam(name string) DurationRange {
var r DurationRange
t.JSONParam(name, &r)
return r
}
func (t *TestEnvironment) FloatRangeParam(name string) FloatRange {
r := FloatRange{}
t.JSONParam(name, &r)
return r
}
func (t *TestEnvironment) DebugSpew(format string, args ...interface{}) {
t.RecordMessage(spew.Sprintf(format, args...))
}
func (t *TestEnvironment) DumpJSON(filename string, v interface{}) {
b, err := json.Marshal(v)
if err != nil {
t.RecordMessage("unable to marshal object to JSON: %s", err)
return
}
f, err := t.CreateRawAsset(filename)
if err != nil {
t.RecordMessage("unable to create asset file: %s", err)
return
}
defer f.Close()
_, err = f.Write(b)
if err != nil {
t.RecordMessage("error writing json object dump: %s", err)
}
}
// WaitUntilAllDone waits until all instances in the test case are done.
func (t *TestEnvironment) WaitUntilAllDone() {
ctx := context.Background()
@ -49,6 +80,9 @@ func WrapTestEnvironment(f func(t *TestEnvironment) error) run.InitializedTestCa
return func(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
t := &TestEnvironment{RunEnv: runenv, InitContext: initCtx}
t.Role = t.StringParam("role")
t.DumpJSON("test-parameters.json", t.TestInstanceParams)
return f(t)
}
}

View File

@ -0,0 +1,77 @@
package testkit
import (
"encoding/json"
"fmt"
"math/rand"
"time"
"github.com/testground/sdk-go/ptypes"
)
// DurationRange is a Testground parameter type that represents a duration
// range, suitable use in randomized tests. This type is encoded as a JSON array
// of length 2 of element type ptypes.Duration, e.g. ["10s", "10m"].
type DurationRange struct {
Min time.Duration
Max time.Duration
}
func (r *DurationRange) ChooseRandom() time.Duration {
i := int64(r.Min) + rand.Int63n(int64(r.Max)-int64(r.Min))
return time.Duration(i)
}
func (r *DurationRange) UnmarshalJSON(b []byte) error {
var s []ptypes.Duration
if err := json.Unmarshal(b, &s); err != nil {
return err
}
if len(s) != 2 {
return fmt.Errorf("expected two-element array of duration strings, got array of length %d", len(s))
}
if s[0].Duration > s[1].Duration {
return fmt.Errorf("expected first element to be <= second element")
}
r.Min = s[0].Duration
r.Max = s[1].Duration
return nil
}
func (r *DurationRange) MarshalJSON() ([]byte, error) {
s := []ptypes.Duration{{r.Min}, {r.Max}}
return json.Marshal(s)
}
// FloatRange is a Testground parameter type that represents a float
// range, suitable use in randomized tests. This type is encoded as a JSON array
// of length 2 of element type float32, e.g. [1.45, 10.675].
type FloatRange struct {
Min float32
Max float32
}
func (r *FloatRange) ChooseRandom() float32 {
return r.Min + rand.Float32()*(r.Max-r.Min)
}
func (r *FloatRange) UnmarshalJSON(b []byte) error {
var s []float32
if err := json.Unmarshal(b, &s); err != nil {
return err
}
if len(s) != 2 {
return fmt.Errorf("expected two-element array of floats, got array of length %d", len(s))
}
if s[0] > s[1] {
return fmt.Errorf("expected first element to be <= second element")
}
r.Min = s[0]
r.Max = s[1]
return nil
}
func (r *FloatRange) MarshalJSON() ([]byte, error) {
s := []float32{r.Min, r.Max}
return json.Marshal(s)
}