Merge pull request #71 from filecoin-project/pubsub-tracer
Pubsub tracing
This commit is contained in:
commit
119defc5e7
@ -43,10 +43,11 @@ import (
|
|||||||
// The we create a genesis block that allocates some funds to each node and collects
|
// The we create a genesis block that allocates some funds to each node and collects
|
||||||
// the presealed sectors.
|
// the presealed sectors.
|
||||||
var baselineRoles = map[string]func(*TestEnvironment) error{
|
var baselineRoles = map[string]func(*TestEnvironment) error{
|
||||||
"bootstrapper": runBootstrapper,
|
"bootstrapper": runBootstrapper,
|
||||||
"miner": runMiner,
|
"miner": runMiner,
|
||||||
"client": runBaselineClient,
|
"client": runBaselineClient,
|
||||||
"drand": runDrandNode,
|
"drand": runDrandNode,
|
||||||
|
"pubsub-tracer": runPubsubTracer,
|
||||||
}
|
}
|
||||||
|
|
||||||
func runBaselineClient(t *TestEnvironment) error {
|
func runBaselineClient(t *TestEnvironment) error {
|
||||||
@ -237,4 +238,3 @@ func extractCarData(ctx context.Context, rdata []byte, rpath string) []byte {
|
|||||||
}
|
}
|
||||||
return rdata
|
return rdata
|
||||||
}
|
}
|
||||||
|
|
||||||
|
68
lotus-soup/compositions/composition-tracer.toml
Normal file
68
lotus-soup/compositions/composition-tracer.toml
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
[metadata]
|
||||||
|
name = "lotus-soup"
|
||||||
|
author = ""
|
||||||
|
|
||||||
|
[global]
|
||||||
|
plan = "lotus-soup"
|
||||||
|
case = "lotus-baseline"
|
||||||
|
total_instances = 7
|
||||||
|
builder = "docker:go"
|
||||||
|
runner = "local:docker"
|
||||||
|
|
||||||
|
[global.build_config]
|
||||||
|
enable_go_build_cache = true
|
||||||
|
|
||||||
|
[[groups]]
|
||||||
|
id = "pubsub-tracer"
|
||||||
|
[groups.instances]
|
||||||
|
count = 1
|
||||||
|
percentage = 0.0
|
||||||
|
[groups.run]
|
||||||
|
[groups.run.test_params]
|
||||||
|
role = "pubsub-tracer"
|
||||||
|
|
||||||
|
[[groups]]
|
||||||
|
id = "bootstrapper"
|
||||||
|
[groups.instances]
|
||||||
|
count = 1
|
||||||
|
percentage = 0.0
|
||||||
|
[groups.run]
|
||||||
|
[groups.run.test_params]
|
||||||
|
role = "bootstrapper"
|
||||||
|
clients = "3"
|
||||||
|
miners = "2"
|
||||||
|
balance = "2000000000"
|
||||||
|
sectors = "10"
|
||||||
|
random_beacon_type = "mock"
|
||||||
|
enable_pubsub_tracer = "true"
|
||||||
|
|
||||||
|
[[groups]]
|
||||||
|
id = "miners"
|
||||||
|
[groups.instances]
|
||||||
|
count = 2
|
||||||
|
percentage = 0.0
|
||||||
|
[groups.run]
|
||||||
|
[groups.run.test_params]
|
||||||
|
role = "miner"
|
||||||
|
clients = "3"
|
||||||
|
miners = "2"
|
||||||
|
balance = "2000000000"
|
||||||
|
sectors = "10"
|
||||||
|
random_beacon_type = "mock"
|
||||||
|
enable_pubsub_tracer = "true"
|
||||||
|
|
||||||
|
|
||||||
|
[[groups]]
|
||||||
|
id = "clients"
|
||||||
|
[groups.instances]
|
||||||
|
count = 3
|
||||||
|
percentage = 0.0
|
||||||
|
[groups.run]
|
||||||
|
[groups.run.test_params]
|
||||||
|
role = "client"
|
||||||
|
clients = "3"
|
||||||
|
miners = "2"
|
||||||
|
balance = "2000000000"
|
||||||
|
sectors = "10"
|
||||||
|
random_beacon_type = "mock"
|
||||||
|
enable_pubsub_tracer = "true"
|
@ -3,7 +3,6 @@ module github.com/filecoin-project/oni/lotus-soup
|
|||||||
go 1.14
|
go 1.14
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/davecgh/go-spew v1.1.1
|
|
||||||
github.com/drand/drand v0.9.2-0.20200616080806-a94e9c1636a4
|
github.com/drand/drand v0.9.2-0.20200616080806-a94e9c1636a4
|
||||||
github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef
|
github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef
|
||||||
github.com/filecoin-project/go-fil-markets v0.3.0
|
github.com/filecoin-project/go-fil-markets v0.3.0
|
||||||
@ -18,7 +17,9 @@ require (
|
|||||||
github.com/ipfs/go-merkledag v0.3.1
|
github.com/ipfs/go-merkledag v0.3.1
|
||||||
github.com/ipfs/go-unixfs v0.2.4
|
github.com/ipfs/go-unixfs v0.2.4
|
||||||
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae
|
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae
|
||||||
|
github.com/libp2p/go-libp2p v0.10.0
|
||||||
github.com/libp2p/go-libp2p-core v0.6.0
|
github.com/libp2p/go-libp2p-core v0.6.0
|
||||||
|
github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6
|
||||||
github.com/multiformats/go-multiaddr v0.2.2
|
github.com/multiformats/go-multiaddr v0.2.2
|
||||||
github.com/testground/sdk-go v0.2.3-0.20200617132925-2e4d69f9ba38
|
github.com/testground/sdk-go v0.2.3-0.20200617132925-2e4d69f9ba38
|
||||||
)
|
)
|
||||||
|
@ -861,6 +861,8 @@ github.com/libp2p/go-libp2p-pubsub v0.1.1/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uz
|
|||||||
github.com/libp2p/go-libp2p-pubsub v0.3.2-0.20200527132641-c0712c6e92cf/go.mod h1:TxPOBuo1FPdsTjFnv+FGZbNbWYsp74Culx+4ViQpato=
|
github.com/libp2p/go-libp2p-pubsub v0.3.2-0.20200527132641-c0712c6e92cf/go.mod h1:TxPOBuo1FPdsTjFnv+FGZbNbWYsp74Culx+4ViQpato=
|
||||||
github.com/libp2p/go-libp2p-pubsub v0.3.2 h1:k3cJm5JW5mjaWZkobS50sJLJWaB2mBi0HW4eRlE8mSo=
|
github.com/libp2p/go-libp2p-pubsub v0.3.2 h1:k3cJm5JW5mjaWZkobS50sJLJWaB2mBi0HW4eRlE8mSo=
|
||||||
github.com/libp2p/go-libp2p-pubsub v0.3.2/go.mod h1:Uss7/Cfz872KggNb+doCVPHeCDmXB7z500m/R8DaAUk=
|
github.com/libp2p/go-libp2p-pubsub v0.3.2/go.mod h1:Uss7/Cfz872KggNb+doCVPHeCDmXB7z500m/R8DaAUk=
|
||||||
|
github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6 h1:2lH7rMlvDPSvXeOR+g7FE6aqiEwxtpxWKQL8uigk5fQ=
|
||||||
|
github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6/go.mod h1:8ZodgKS4qRLayfw9FDKDd9DX4C16/GMofDxSldG8QPI=
|
||||||
github.com/libp2p/go-libp2p-quic-transport v0.1.1 h1:MFMJzvsxIEDEVKzO89BnB/FgvMj9WI4GDGUW2ArDPUA=
|
github.com/libp2p/go-libp2p-quic-transport v0.1.1 h1:MFMJzvsxIEDEVKzO89BnB/FgvMj9WI4GDGUW2ArDPUA=
|
||||||
github.com/libp2p/go-libp2p-quic-transport v0.1.1/go.mod h1:wqG/jzhF3Pu2NrhJEvE+IE0NTHNXslOPn9JQzyCAxzU=
|
github.com/libp2p/go-libp2p-quic-transport v0.1.1/go.mod h1:wqG/jzhF3Pu2NrhJEvE+IE0NTHNXslOPn9JQzyCAxzU=
|
||||||
github.com/libp2p/go-libp2p-quic-transport v0.5.0 h1:BUN1lgYNUrtv4WLLQ5rQmC9MCJ6uEXusezGvYRNoJXE=
|
github.com/libp2p/go-libp2p-quic-transport v0.5.0 h1:BUN1lgYNUrtv4WLLQ5rQmC9MCJ6uEXusezGvYRNoJXE=
|
||||||
|
@ -36,3 +36,6 @@ instances = { min = 1, max = 100, default = 5 }
|
|||||||
drand_period = { type = "duration", default="10s" }
|
drand_period = { type = "duration", default="10s" }
|
||||||
drand_threshold = { type = "int", default = 2 }
|
drand_threshold = { type = "int", default = 2 }
|
||||||
drand_gossip_relay = { type = "bool", default = true }
|
drand_gossip_relay = { type = "bool", default = true }
|
||||||
|
|
||||||
|
# Params relevant to pubsub tracing
|
||||||
|
enable_pubsub_tracer = { type = "bool", default = false }
|
||||||
|
@ -127,6 +127,11 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
clients := t.IntParam("clients")
|
clients := t.IntParam("clients")
|
||||||
miners := t.IntParam("miners")
|
miners := t.IntParam("miners")
|
||||||
nodes := clients + miners
|
nodes := clients + miners
|
||||||
@ -196,7 +201,7 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
|
|||||||
node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)),
|
node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)),
|
||||||
withListenAddress(bootstrapperIP),
|
withListenAddress(bootstrapperIP),
|
||||||
withBootstrapper(nil),
|
withBootstrapper(nil),
|
||||||
withPubsubConfig(true),
|
withPubsubConfig(true, pubsubTracer),
|
||||||
drandOpt,
|
drandOpt,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -250,6 +255,11 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
drandOpt, err := getDrandConfig(ctx, t)
|
drandOpt, err := getDrandConfig(ctx, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -367,7 +377,7 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
|
|||||||
withGenesis(genesisMsg.Genesis),
|
withGenesis(genesisMsg.Genesis),
|
||||||
withListenAddress(minerIP),
|
withListenAddress(minerIP),
|
||||||
withBootstrapper(genesisMsg.Bootstrapper),
|
withBootstrapper(genesisMsg.Bootstrapper),
|
||||||
withPubsubConfig(false),
|
withPubsubConfig(false, pubsubTracer),
|
||||||
drandOpt,
|
drandOpt,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -473,6 +483,11 @@ func prepareClient(t *TestEnvironment) (*Node, error) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
drandOpt, err := getDrandConfig(ctx, t)
|
drandOpt, err := getDrandConfig(ctx, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -506,7 +521,7 @@ func prepareClient(t *TestEnvironment) (*Node, error) {
|
|||||||
withGenesis(genesisMsg.Genesis),
|
withGenesis(genesisMsg.Genesis),
|
||||||
withListenAddress(clientIP),
|
withListenAddress(clientIP),
|
||||||
withBootstrapper(genesisMsg.Bootstrapper),
|
withBootstrapper(genesisMsg.Bootstrapper),
|
||||||
withPubsubConfig(false),
|
withPubsubConfig(false, pubsubTracer),
|
||||||
drandOpt,
|
drandOpt,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -571,11 +586,11 @@ func withBootstrapper(ab []byte) node.Option {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func withPubsubConfig(bootstrapper bool) node.Option {
|
func withPubsubConfig(bootstrapper bool, pubsubTracer string) node.Option {
|
||||||
return node.Override(new(*config.Pubsub), func() *config.Pubsub {
|
return node.Override(new(*config.Pubsub), func() *config.Pubsub {
|
||||||
return &config.Pubsub{
|
return &config.Pubsub{
|
||||||
Bootstrapper: bootstrapper,
|
Bootstrapper: bootstrapper,
|
||||||
RemoteTracer: "",
|
RemoteTracer: pubsubTracer,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -670,6 +685,22 @@ func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([
|
|||||||
return addrs, nil
|
return addrs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getPubsubTracerConfig(ctx context.Context, t *TestEnvironment) (string, error) {
|
||||||
|
if !t.BooleanParam("enable_pubsub_tracer") {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan *PubsubTracerMsg)
|
||||||
|
sub := t.SyncClient.MustSubscribe(ctx, pubsubTracerTopic, ch)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case m := <-ch:
|
||||||
|
return m.Tracer, nil
|
||||||
|
case err := <-sub.Done():
|
||||||
|
return "", fmt.Errorf("got error while waiting for pubsub tracer config: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error) {
|
func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error) {
|
||||||
beaconType := t.StringParam("random_beacon_type")
|
beaconType := t.StringParam("random_beacon_type")
|
||||||
switch beaconType {
|
switch beaconType {
|
||||||
@ -684,7 +715,7 @@ func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.RecordMessage("error getting drand config: %w", err)
|
t.RecordMessage("error getting drand config: %w", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
||||||
}
|
}
|
||||||
t.RecordMessage("setting drand config: %v", cfg)
|
t.RecordMessage("setting drand config: %v", cfg)
|
||||||
return node.Options(
|
return node.Options(
|
||||||
|
92
lotus-soup/tracer.go
Normal file
92
lotus-soup/tracer.go
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/testground/sdk-go/sync"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p"
|
||||||
|
"github.com/libp2p/go-libp2p-core/crypto"
|
||||||
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
"github.com/libp2p/go-libp2p-pubsub-tracer/traced"
|
||||||
|
|
||||||
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
pubsubTracerTopic = sync.NewTopic("pubsubTracer", &PubsubTracerMsg{})
|
||||||
|
)
|
||||||
|
|
||||||
|
type PubsubTracer struct {
|
||||||
|
host host.Host
|
||||||
|
traced *traced.TraceCollector
|
||||||
|
}
|
||||||
|
|
||||||
|
type PubsubTracerMsg struct {
|
||||||
|
Tracer string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr *PubsubTracer) Stop() error {
|
||||||
|
tr.traced.Stop()
|
||||||
|
return tr.host.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func preparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
privk, _, err := crypto.GenerateEd25519Key(rand.Reader)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tracedIP := t.NetClient.MustGetDataNetworkIP().String()
|
||||||
|
tracedAddr := fmt.Sprintf("/ip4/%s/tcp/4001", tracedIP)
|
||||||
|
|
||||||
|
host, err := libp2p.New(ctx,
|
||||||
|
libp2p.Identity(privk),
|
||||||
|
libp2p.ListenAddrStrings(tracedAddr),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tracedDir := t.TestOutputsPath + "/traced.logs"
|
||||||
|
traced, err := traced.NewTraceCollector(host, tracedDir)
|
||||||
|
if err != nil {
|
||||||
|
host.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tracedMultiaddrStr := fmt.Sprintf("%s/p2p/%s", tracedAddr, host.ID())
|
||||||
|
t.RecordMessage("I am %s", tracedMultiaddrStr)
|
||||||
|
|
||||||
|
_ = ma.StringCast(tracedMultiaddrStr)
|
||||||
|
tracedMsg := &PubsubTracerMsg{Tracer: tracedMultiaddrStr}
|
||||||
|
t.SyncClient.MustPublish(ctx, pubsubTracerTopic, tracedMsg)
|
||||||
|
|
||||||
|
t.RecordMessage("waiting for all nodes to be ready")
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
|
||||||
|
|
||||||
|
return &PubsubTracer{host: host, traced: traced}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func runPubsubTracer(t *TestEnvironment) error {
|
||||||
|
t.RecordMessage("running pubsub tracer")
|
||||||
|
tracer, err := preparePubsubTracer(t)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
err := tracer.Stop()
|
||||||
|
if err != nil {
|
||||||
|
t.RecordMessage("error stoping tracer: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user