diff --git a/lotus-soup/baseline.go b/lotus-soup/baseline.go index 6f5cf9678..df0e195b2 100644 --- a/lotus-soup/baseline.go +++ b/lotus-soup/baseline.go @@ -43,10 +43,11 @@ import ( // The we create a genesis block that allocates some funds to each node and collects // the presealed sectors. var baselineRoles = map[string]func(*TestEnvironment) error{ - "bootstrapper": runBootstrapper, - "miner": runMiner, - "client": runBaselineClient, - "drand": runDrandNode, + "bootstrapper": runBootstrapper, + "miner": runMiner, + "client": runBaselineClient, + "drand": runDrandNode, + "pubsub-tracer": runPubsubTracer, } func runBaselineClient(t *TestEnvironment) error { @@ -237,4 +238,3 @@ func extractCarData(ctx context.Context, rdata []byte, rpath string) []byte { } return rdata } - diff --git a/lotus-soup/compositions/composition-tracer.toml b/lotus-soup/compositions/composition-tracer.toml new file mode 100644 index 000000000..ec3e5dc5e --- /dev/null +++ b/lotus-soup/compositions/composition-tracer.toml @@ -0,0 +1,68 @@ +[metadata] + name = "lotus-soup" + author = "" + +[global] + plan = "lotus-soup" + case = "lotus-baseline" + total_instances = 7 + builder = "docker:go" + runner = "local:docker" + +[global.build_config] + enable_go_build_cache = true + +[[groups]] + id = "pubsub-tracer" + [groups.instances] + count = 1 + percentage = 0.0 + [groups.run] + [groups.run.test_params] + role = "pubsub-tracer" + +[[groups]] + id = "bootstrapper" + [groups.instances] + count = 1 + percentage = 0.0 + [groups.run] + [groups.run.test_params] + role = "bootstrapper" + clients = "3" + miners = "2" + balance = "2000000000" + sectors = "10" + random_beacon_type = "mock" + enable_pubsub_tracer = "true" + +[[groups]] + id = "miners" + [groups.instances] + count = 2 + percentage = 0.0 + [groups.run] + [groups.run.test_params] + role = "miner" + clients = "3" + miners = "2" + balance = "2000000000" + sectors = "10" + random_beacon_type = "mock" + enable_pubsub_tracer = "true" + + +[[groups]] + id = "clients" + [groups.instances] + count = 3 + percentage = 0.0 + [groups.run] + [groups.run.test_params] + role = "client" + clients = "3" + miners = "2" + balance = "2000000000" + sectors = "10" + random_beacon_type = "mock" + enable_pubsub_tracer = "true" diff --git a/lotus-soup/go.mod b/lotus-soup/go.mod index 91704679b..018902235 100644 --- a/lotus-soup/go.mod +++ b/lotus-soup/go.mod @@ -3,7 +3,6 @@ module github.com/filecoin-project/oni/lotus-soup go 1.14 require ( - github.com/davecgh/go-spew v1.1.1 github.com/drand/drand v0.9.2-0.20200616080806-a94e9c1636a4 github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef github.com/filecoin-project/go-fil-markets v0.3.0 @@ -18,7 +17,9 @@ require ( github.com/ipfs/go-merkledag v0.3.1 github.com/ipfs/go-unixfs v0.2.4 github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae + github.com/libp2p/go-libp2p v0.10.0 github.com/libp2p/go-libp2p-core v0.6.0 + github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6 github.com/multiformats/go-multiaddr v0.2.2 github.com/testground/sdk-go v0.2.3-0.20200617132925-2e4d69f9ba38 ) diff --git a/lotus-soup/go.sum b/lotus-soup/go.sum index 433b3cb64..8116e54d0 100644 --- a/lotus-soup/go.sum +++ b/lotus-soup/go.sum @@ -861,6 +861,8 @@ github.com/libp2p/go-libp2p-pubsub v0.1.1/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uz github.com/libp2p/go-libp2p-pubsub v0.3.2-0.20200527132641-c0712c6e92cf/go.mod h1:TxPOBuo1FPdsTjFnv+FGZbNbWYsp74Culx+4ViQpato= github.com/libp2p/go-libp2p-pubsub v0.3.2 h1:k3cJm5JW5mjaWZkobS50sJLJWaB2mBi0HW4eRlE8mSo= github.com/libp2p/go-libp2p-pubsub v0.3.2/go.mod h1:Uss7/Cfz872KggNb+doCVPHeCDmXB7z500m/R8DaAUk= +github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6 h1:2lH7rMlvDPSvXeOR+g7FE6aqiEwxtpxWKQL8uigk5fQ= +github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6/go.mod h1:8ZodgKS4qRLayfw9FDKDd9DX4C16/GMofDxSldG8QPI= github.com/libp2p/go-libp2p-quic-transport v0.1.1 h1:MFMJzvsxIEDEVKzO89BnB/FgvMj9WI4GDGUW2ArDPUA= github.com/libp2p/go-libp2p-quic-transport v0.1.1/go.mod h1:wqG/jzhF3Pu2NrhJEvE+IE0NTHNXslOPn9JQzyCAxzU= github.com/libp2p/go-libp2p-quic-transport v0.5.0 h1:BUN1lgYNUrtv4WLLQ5rQmC9MCJ6uEXusezGvYRNoJXE= diff --git a/lotus-soup/manifest.toml b/lotus-soup/manifest.toml index 6b75b00d1..f33fdd645 100644 --- a/lotus-soup/manifest.toml +++ b/lotus-soup/manifest.toml @@ -36,3 +36,6 @@ instances = { min = 1, max = 100, default = 5 } drand_period = { type = "duration", default="10s" } drand_threshold = { type = "int", default = 2 } drand_gossip_relay = { type = "bool", default = true } + + # Params relevant to pubsub tracing + enable_pubsub_tracer = { type = "bool", default = false } diff --git a/lotus-soup/node.go b/lotus-soup/node.go index a57fb5f82..8a04b6775 100644 --- a/lotus-soup/node.go +++ b/lotus-soup/node.go @@ -127,6 +127,11 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) { ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) defer cancel() + pubsubTracer, err := getPubsubTracerConfig(ctx, t) + if err != nil { + return nil, err + } + clients := t.IntParam("clients") miners := t.IntParam("miners") nodes := clients + miners @@ -196,7 +201,7 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) { node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)), withListenAddress(bootstrapperIP), withBootstrapper(nil), - withPubsubConfig(true), + withPubsubConfig(true, pubsubTracer), drandOpt, ) if err != nil { @@ -250,6 +255,11 @@ func prepareMiner(t *TestEnvironment) (*Node, error) { ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) defer cancel() + pubsubTracer, err := getPubsubTracerConfig(ctx, t) + if err != nil { + return nil, err + } + drandOpt, err := getDrandConfig(ctx, t) if err != nil { return nil, err @@ -367,7 +377,7 @@ func prepareMiner(t *TestEnvironment) (*Node, error) { withGenesis(genesisMsg.Genesis), withListenAddress(minerIP), withBootstrapper(genesisMsg.Bootstrapper), - withPubsubConfig(false), + withPubsubConfig(false, pubsubTracer), drandOpt, ) if err != nil { @@ -473,6 +483,11 @@ func prepareClient(t *TestEnvironment) (*Node, error) { ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) defer cancel() + pubsubTracer, err := getPubsubTracerConfig(ctx, t) + if err != nil { + return nil, err + } + drandOpt, err := getDrandConfig(ctx, t) if err != nil { return nil, err @@ -506,7 +521,7 @@ func prepareClient(t *TestEnvironment) (*Node, error) { withGenesis(genesisMsg.Genesis), withListenAddress(clientIP), withBootstrapper(genesisMsg.Bootstrapper), - withPubsubConfig(false), + withPubsubConfig(false, pubsubTracer), drandOpt, ) if err != nil { @@ -571,11 +586,11 @@ func withBootstrapper(ab []byte) node.Option { }) } -func withPubsubConfig(bootstrapper bool) node.Option { +func withPubsubConfig(bootstrapper bool, pubsubTracer string) node.Option { return node.Override(new(*config.Pubsub), func() *config.Pubsub { return &config.Pubsub{ Bootstrapper: bootstrapper, - RemoteTracer: "", + RemoteTracer: pubsubTracer, } }) } @@ -670,6 +685,22 @@ func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([ return addrs, nil } +func getPubsubTracerConfig(ctx context.Context, t *TestEnvironment) (string, error) { + if !t.BooleanParam("enable_pubsub_tracer") { + return "", nil + } + + ch := make(chan *PubsubTracerMsg) + sub := t.SyncClient.MustSubscribe(ctx, pubsubTracerTopic, ch) + + select { + case m := <-ch: + return m.Tracer, nil + case err := <-sub.Done(): + return "", fmt.Errorf("got error while waiting for pubsub tracer config: %w", err) + } +} + func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error) { beaconType := t.StringParam("random_beacon_type") switch beaconType { @@ -684,7 +715,7 @@ func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error if err != nil { t.RecordMessage("error getting drand config: %w", err) return nil, err - + } t.RecordMessage("setting drand config: %v", cfg) return node.Options( diff --git a/lotus-soup/tracer.go b/lotus-soup/tracer.go new file mode 100644 index 000000000..17a287615 --- /dev/null +++ b/lotus-soup/tracer.go @@ -0,0 +1,92 @@ +package main + +import ( + "context" + "crypto/rand" + "fmt" + + "github.com/testground/sdk-go/sync" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-pubsub-tracer/traced" + + ma "github.com/multiformats/go-multiaddr" +) + +var ( + pubsubTracerTopic = sync.NewTopic("pubsubTracer", &PubsubTracerMsg{}) +) + +type PubsubTracer struct { + host host.Host + traced *traced.TraceCollector +} + +type PubsubTracerMsg struct { + Tracer string +} + +func (tr *PubsubTracer) Stop() error { + tr.traced.Stop() + return tr.host.Close() +} + +func preparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) { + ctx := context.Background() + + privk, _, err := crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + return nil, err + } + + tracedIP := t.NetClient.MustGetDataNetworkIP().String() + tracedAddr := fmt.Sprintf("/ip4/%s/tcp/4001", tracedIP) + + host, err := libp2p.New(ctx, + libp2p.Identity(privk), + libp2p.ListenAddrStrings(tracedAddr), + ) + if err != nil { + return nil, err + } + + tracedDir := t.TestOutputsPath + "/traced.logs" + traced, err := traced.NewTraceCollector(host, tracedDir) + if err != nil { + host.Close() + return nil, err + } + + tracedMultiaddrStr := fmt.Sprintf("%s/p2p/%s", tracedAddr, host.ID()) + t.RecordMessage("I am %s", tracedMultiaddrStr) + + _ = ma.StringCast(tracedMultiaddrStr) + tracedMsg := &PubsubTracerMsg{Tracer: tracedMultiaddrStr} + t.SyncClient.MustPublish(ctx, pubsubTracerTopic, tracedMsg) + + t.RecordMessage("waiting for all nodes to be ready") + t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) + + return &PubsubTracer{host: host, traced: traced}, nil +} + +func runPubsubTracer(t *TestEnvironment) error { + t.RecordMessage("running pubsub tracer") + tracer, err := preparePubsubTracer(t) + if err != nil { + return err + } + + defer func() { + err := tracer.Stop() + if err != nil { + t.RecordMessage("error stoping tracer: %s", err) + } + }() + + ctx := context.Background() + t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) + return nil +}