add pubsub tracer config to scaffolding

This commit is contained in:
vyzo 2020-06-26 11:58:56 +03:00
parent 1c2af85b20
commit 71503cce8b
2 changed files with 39 additions and 8 deletions

View File

@ -127,6 +127,11 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel() defer cancel()
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
if err != nil {
return nil, err
}
clients := t.IntParam("clients") clients := t.IntParam("clients")
miners := t.IntParam("miners") miners := t.IntParam("miners")
nodes := clients + miners nodes := clients + miners
@ -196,7 +201,7 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)), node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)),
withListenAddress(bootstrapperIP), withListenAddress(bootstrapperIP),
withBootstrapper(nil), withBootstrapper(nil),
withPubsubConfig(true), withPubsubConfig(true, pubsubTracer),
drandOpt, drandOpt,
) )
if err != nil { if err != nil {
@ -250,6 +255,11 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel() defer cancel()
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
if err != nil {
return nil, err
}
drandOpt, err := getDrandConfig(ctx, t) drandOpt, err := getDrandConfig(ctx, t)
if err != nil { if err != nil {
return nil, err return nil, err
@ -367,7 +377,7 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
withGenesis(genesisMsg.Genesis), withGenesis(genesisMsg.Genesis),
withListenAddress(minerIP), withListenAddress(minerIP),
withBootstrapper(genesisMsg.Bootstrapper), withBootstrapper(genesisMsg.Bootstrapper),
withPubsubConfig(false), withPubsubConfig(false, pubsubTracer),
drandOpt, drandOpt,
) )
if err != nil { if err != nil {
@ -473,6 +483,11 @@ func prepareClient(t *TestEnvironment) (*Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel() defer cancel()
pubsubTracer, err := getPubsubTracerConfig(ctx, t)
if err != nil {
return nil, err
}
drandOpt, err := getDrandConfig(ctx, t) drandOpt, err := getDrandConfig(ctx, t)
if err != nil { if err != nil {
return nil, err return nil, err
@ -506,7 +521,7 @@ func prepareClient(t *TestEnvironment) (*Node, error) {
withGenesis(genesisMsg.Genesis), withGenesis(genesisMsg.Genesis),
withListenAddress(clientIP), withListenAddress(clientIP),
withBootstrapper(genesisMsg.Bootstrapper), withBootstrapper(genesisMsg.Bootstrapper),
withPubsubConfig(false), withPubsubConfig(false, pubsubTracer),
drandOpt, drandOpt,
) )
if err != nil { if err != nil {
@ -571,11 +586,11 @@ func withBootstrapper(ab []byte) node.Option {
}) })
} }
func withPubsubConfig(bootstrapper bool) node.Option { func withPubsubConfig(bootstrapper bool, pubsubTracer string) node.Option {
return node.Override(new(*config.Pubsub), func() *config.Pubsub { return node.Override(new(*config.Pubsub), func() *config.Pubsub {
return &config.Pubsub{ return &config.Pubsub{
Bootstrapper: bootstrapper, Bootstrapper: bootstrapper,
RemoteTracer: "", RemoteTracer: pubsubTracer,
} }
}) })
} }
@ -670,6 +685,22 @@ func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([
return addrs, nil return addrs, nil
} }
func getPubsubTracerConfig(ctx context.Context, t *TestEnvironment) (string, error) {
if !t.BooleanParam("enable_pubsub_tracer") {
return "", nil
}
ch := make(chan *PubsubTracerMsg)
sub := t.SyncClient.MustSubscribe(ctx, pubsubTracerTopic, ch)
select {
case m := <-ch:
return m.Tracer, nil
case err := <-sub.Done():
return "", fmt.Errorf("got error while waiting for clients addrs: %w", err)
}
}
func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error) { func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error) {
beaconType := t.StringParam("random_beacon_type") beaconType := t.StringParam("random_beacon_type")
switch beaconType { switch beaconType {

View File

@ -26,7 +26,7 @@ type PubsubTracer struct {
} }
type PubsubTracerMsg struct { type PubsubTracerMsg struct {
Tracer []byte Tracer string
} }
func (tr *PubsubTracer) Stop() { func (tr *PubsubTracer) Stop() {
@ -66,7 +66,7 @@ func preparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) {
t.RecordMessage("I am %s", tracedMultiaddrStr) t.RecordMessage("I am %s", tracedMultiaddrStr)
tracedMultiaddr := ma.StringCast(tracedMultiaddrStr) tracedMultiaddr := ma.StringCast(tracedMultiaddrStr)
tracedMsg := &PubsubTracerMsg{Tracer: tracedMultiaddr.Bytes()} tracedMsg := &PubsubTracerMsg{Tracer: tracedMultiaddr.String()}
t.SyncClient.MustPublish(ctx, pubsubTracerTopic, tracedMsg) t.SyncClient.MustPublish(ctx, pubsubTracerTopic, tracedMsg)
t.RecordMessage("waiting for all nodes to be ready") t.RecordMessage("waiting for all nodes to be ready")