run drand gossip relays alongside each drand node

This commit is contained in:
Yusef Napora 2020-06-24 13:39:06 -04:00
parent 4a337785b5
commit e225a644f4
3 changed files with 117 additions and 26 deletions

View File

@ -3,12 +3,20 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/hex"
"encoding/json"
"fmt" "fmt"
"net"
"os"
"time" "time"
"github.com/drand/drand/chain" "github.com/drand/drand/chain"
hclient "github.com/drand/drand/client/http" hclient "github.com/drand/drand/client/http"
"github.com/drand/drand/log"
"github.com/drand/drand/lp2p"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/testground/sdk-go/sync" "github.com/testground/sdk-go/sync"
"github.com/drand/drand/demo/node" "github.com/drand/drand/demo/node"
@ -16,13 +24,23 @@ import (
var ( var (
PrepareDrandTimeout = time.Minute PrepareDrandTimeout = time.Minute
drandConfigTopic = sync.NewTopic("drand-config", &dtypes.DrandConfig{}) drandConfigTopic = sync.NewTopic("drand-config", &DrandRuntimeInfo{})
) )
type DrandRuntimeInfo struct {
Config dtypes.DrandConfig
GossipBootstrap dtypes.DrandBootstrap
}
type DrandInstance struct {
Node node.Node
GossipRelay *lp2p.GossipRelayNode
}
// waitForDrandConfig should be called by filecoin instances before constructing the lotus Node // waitForDrandConfig should be called by filecoin instances before constructing the lotus Node
// you can use the returned dtypes.DrandConfig to override the default production config. // you can use the returned dtypes.DrandConfig to override the default production config.
func waitForDrandConfig(ctx context.Context, client sync.Client) (*dtypes.DrandConfig, error) { func waitForDrandConfig(ctx context.Context, client sync.Client) (*DrandRuntimeInfo, error) {
ch := make(chan *dtypes.DrandConfig, 1) ch := make(chan *DrandRuntimeInfo, 1)
sub := client.MustSubscribe(ctx, drandConfigTopic, ch) sub := client.MustSubscribe(ctx, drandConfigTopic, ch)
select { select {
case cfg := <-ch: case cfg := <-ch:
@ -35,7 +53,7 @@ func waitForDrandConfig(ctx context.Context, client sync.Client) (*dtypes.DrandC
// prepareDrandNode starts a drand instance and runs a DKG with the other members of the composition group. // prepareDrandNode starts a drand instance and runs a DKG with the other members of the composition group.
// Once the chain is running, the leader publishes the chain info needed by lotus nodes on // Once the chain is running, the leader publishes the chain info needed by lotus nodes on
// drandConfigTopic // drandConfigTopic
func prepareDrandNode(t *TestEnvironment) (node.Node, error) { func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareDrandTimeout) ctx, cancel := context.WithTimeout(context.Background(), PrepareDrandTimeout)
defer cancel() defer cancel()
@ -46,13 +64,14 @@ func prepareDrandNode(t *TestEnvironment) (node.Node, error) {
nNodes := t.TestGroupInstanceCount nNodes := t.TestGroupInstanceCount
myAddr := t.NetClient.MustGetDataNetworkIP() myAddr := t.NetClient.MustGetDataNetworkIP()
// TODO: add test params for drand period := t.DurationParam("drand_period")
period := "10s" threshold := t.IntParam("drand_threshold")
beaconOffset := 12 runGossipRelay := t.BooleanParam("drand_gossip_relay")
threshold := 2
beaconOffset := 3
// TODO(maybe): use TLS? // TODO(maybe): use TLS?
n := node.NewLocalNode(int(seq), period, "~/", false, myAddr.String()) n := node.NewLocalNode(int(seq), period.String(), "~/", false, myAddr.String())
// share the node addresses with other nodes // share the node addresses with other nodes
// TODO: if we implement TLS, this is where we'd share public TLS keys // TODO: if we implement TLS, this is where we'd share public TLS keys
@ -107,21 +126,25 @@ func prepareDrandNode(t *TestEnvironment) (node.Node, error) {
// run DKG // run DKG
t.SyncClient.MustSignalAndWait(ctx, "drand-dkg-start", nNodes) t.SyncClient.MustSignalAndWait(ctx, "drand-dkg-start", nNodes)
grp := n.RunDKG(nNodes, threshold, period, isLeader, leaderAddr, beaconOffset) if !isLeader {
time.Sleep(time.Second)
}
grp := n.RunDKG(nNodes, threshold, period.String(), isLeader, leaderAddr, beaconOffset)
if grp == nil { if grp == nil {
return nil, fmt.Errorf("drand dkg failed") return nil, fmt.Errorf("drand dkg failed")
} }
t.R().RecordPoint("drand_dkg_complete", time.Now().Sub(startTime).Seconds()) t.R().RecordPoint("drand_dkg_complete", time.Now().Sub(startTime).Seconds())
t.RecordMessage("drand dkg complete, waiting for chain start")
// wait for chain to begin // wait for chain to begin
to := time.Until(time.Unix(grp.GenesisTime, 0).Add(3 * time.Second).Add(grp.Period)) to := time.Until(time.Unix(grp.GenesisTime, 0).Add(3 * time.Second).Add(grp.Period))
time.Sleep(to) time.Sleep(to)
t.RecordMessage("drand beacon chain started, fetching initial round via http")
// verify that we can get a round of randomness from the chain using an http client // verify that we can get a round of randomness from the chain using an http client
info := chain.NewChainInfo(grp) info := chain.NewChainInfo(grp)
client, err := hclient.NewWithInfo(publicAddrs[0], info, nil) myPublicAddr := fmt.Sprintf("http://%s", n.PublicAddr())
client, err := hclient.NewWithInfo(myPublicAddr, info, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create drand http client: %w", err) return nil, fmt.Errorf("unable to create drand http client: %w", err)
} }
@ -131,18 +154,73 @@ func prepareDrandNode(t *TestEnvironment) (node.Node, error) {
return nil, fmt.Errorf("unable to get initial drand round: %w", err) return nil, fmt.Errorf("unable to get initial drand round: %w", err)
} }
// start gossip relay (unless disabled via testplan parameter)
var gossipRelay *lp2p.GossipRelayNode
var relayAddrs []peer.AddrInfo
if runGossipRelay {
_ = os.Mkdir("~/drand-gossip", os.ModePerm)
listenAddr := fmt.Sprintf("/ip4/%s/tcp/7777", myAddr.String())
relayCfg := lp2p.GossipRelayConfig{
ChainHash: hex.EncodeToString(info.Hash()),
Addr: listenAddr,
DataDir: "~/drand-gossip",
IdentityPath: "~/drand-gossip/identity.key",
Insecure: true,
Client: client,
}
t.RecordMessage("starting drand gossip relay")
var err error
gossipRelay, err = lp2p.NewGossipRelayNode(log.DefaultLogger, &relayCfg)
if err != nil {
return nil, fmt.Errorf("failed to construct drand gossip relay: %w", err)
}
t.RecordMessage("sharing gossip relay addrs")
// share the gossip relay addrs so we can publish them in DrandRuntimeInfo
relayInfo, err := relayAddrInfo(gossipRelay.Multiaddrs(), myAddr)
if err != nil {
return nil, err
}
infoCh := make(chan *peer.AddrInfo, nNodes)
infoTopic := sync.NewTopic("drand-gossip-addrs", &peer.AddrInfo{})
t.SyncClient.MustPublishSubscribe(ctx, infoTopic, relayInfo, infoCh)
for i := 0; i < nNodes; i++ {
ai := <-infoCh
relayAddrs = append(relayAddrs, *ai)
}
}
// if we're the leader, publish the config to the sync service // if we're the leader, publish the config to the sync service
if isLeader { if isLeader {
buf := bytes.Buffer{} buf := bytes.Buffer{}
if err := info.ToJSON(&buf); err != nil { if err := info.ToJSON(&buf); err != nil {
return nil, fmt.Errorf("error marshaling chain info: %w", err) return nil, fmt.Errorf("error marshaling chain info: %w", err)
} }
msg := dtypes.DrandConfig{ cfg := DrandRuntimeInfo{
Config: dtypes.DrandConfig{
Servers: publicAddrs, Servers: publicAddrs,
ChainInfoJSON: buf.String(), ChainInfoJSON: buf.String(),
},
GossipBootstrap: relayAddrs,
} }
t.SyncClient.MustPublish(ctx, drandConfigTopic, &msg) dump, _ := json.Marshal(cfg)
t.RecordMessage("publishing drand config on sync topic: %s", string(dump))
t.SyncClient.MustPublish(ctx, drandConfigTopic, &cfg)
} }
return n, nil return &DrandInstance{
Node: n,
GossipRelay: gossipRelay,
}, nil
}
func relayAddrInfo(addrs []ma.Multiaddr, dataIP net.IP) (*peer.AddrInfo, error) {
for _, a := range addrs {
if ip, _ := a.ValueForProtocol(ma.P_IP4); ip != dataIP.String() {
continue
}
return peer.AddrInfoFromP2pAddr(a)
}
return nil, fmt.Errorf("no addr found with data ip %s in addrs: %v", dataIP, addrs)
} }

View File

@ -27,5 +27,11 @@ instances = { min = 1, max = 100, default = 5 }
miners = { type = "int", default = 1 } miners = { type = "int", default = 1 }
balance = { type = "int", default = 1 } balance = { type = "int", default = 1 }
sectors = { type = "int", default = 1 } sectors = { type = "int", default = 1 }
real_drand = { type = "bool", default = "false", desc = "set to true to use the real drand network config baked into lotus" }
role = { type = "string" } role = { type = "string" }
real_drand = { type = "bool", default = "false", desc = "set to true to use the real drand network config baked into lotus. otherwise, make sure there's a group with role='drand' and at least 2 nodes" }
# params relevant to drand nodes:
drand_period = { type = "duration", default="10s" }
drand_threshold = { type = "int", default = 2 }
drand_gossip_relay = { type = "bool", default = true }

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"crypto/rand" "crypto/rand"
"os" "os"
"strings"
//"encoding/json" //"encoding/json"
"fmt" "fmt"
@ -82,6 +83,15 @@ type TestEnvironment struct {
*run.InitContext *run.InitContext
} }
func (t *TestEnvironment) DurationParam(name string) time.Duration {
s := strings.ReplaceAll(t.StringParam(name), "\"", "")
d, err := time.ParseDuration(s)
if err != nil {
panic(fmt.Errorf("invalid duration value for param '%s': %w", name, err))
}
return d
}
type Node struct { type Node struct {
fullApi api.FullNode fullApi api.FullNode
minerApi api.StorageMiner minerApi api.StorageMiner
@ -668,11 +678,8 @@ func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error
return nil, err return nil, err
} }
t.RecordMessage("setting drand config: %v", cfg) t.RecordMessage("setting drand config: %v", cfg)
return node.Options( return node.Options(
node.Override(new(dtypes.DrandConfig), *cfg), node.Override(new(dtypes.DrandConfig), cfg.Config),
node.Override(new(dtypes.DrandBootstrap), cfg.GossipBootstrap),
// FIXME: re-enable drand bootstrap peers once drand gossip relays are running in testground
node.Override(new(dtypes.DrandBootstrap), dtypes.DrandBootstrap{}),
), nil ), nil
} }