392 lines
11 KiB
Go
392 lines
11 KiB
Go
package testkit
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net"
|
|
"os"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/drand/drand/chain"
|
|
"github.com/drand/drand/client"
|
|
hclient "github.com/drand/drand/client/http"
|
|
"github.com/drand/drand/core"
|
|
"github.com/drand/drand/key"
|
|
"github.com/drand/drand/log"
|
|
"github.com/drand/drand/lp2p"
|
|
dnet "github.com/drand/drand/net"
|
|
"github.com/drand/drand/protobuf/drand"
|
|
dtest "github.com/drand/drand/test"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
"github.com/testground/sdk-go/sync"
|
|
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/testplans/lotus-soup/statemachine"
|
|
)
|
|
|
|
var (
|
|
PrepareDrandTimeout = 3 * time.Minute
|
|
secretDKG = "dkgsecret"
|
|
)
|
|
|
|
type DrandInstance struct {
|
|
daemon *core.Drand
|
|
httpClient client.Client
|
|
ctrlClient *dnet.ControlClient
|
|
gossipRelay *lp2p.GossipRelayNode
|
|
|
|
t *TestEnvironment
|
|
stateDir string
|
|
priv *key.Pair
|
|
pubAddr string
|
|
privAddr string
|
|
ctrlAddr string
|
|
}
|
|
|
|
func (dr *DrandInstance) Start() error {
|
|
opts := []core.ConfigOption{
|
|
core.WithLogLevel(getLogLevel(dr.t)),
|
|
core.WithConfigFolder(dr.stateDir),
|
|
core.WithPublicListenAddress(dr.pubAddr),
|
|
core.WithPrivateListenAddress(dr.privAddr),
|
|
core.WithControlPort(dr.ctrlAddr),
|
|
core.WithInsecure(),
|
|
}
|
|
conf := core.NewConfig(opts...)
|
|
fs := key.NewFileStore(conf.ConfigFolder())
|
|
fs.SaveKeyPair(dr.priv)
|
|
key.Save(path.Join(dr.stateDir, "public.toml"), dr.priv.Public, false)
|
|
if dr.daemon == nil {
|
|
drand, err := core.NewDrand(fs, conf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dr.daemon = drand
|
|
} else {
|
|
drand, err := core.LoadDrand(fs, conf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
drand.StartBeacon(true)
|
|
dr.daemon = drand
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (dr *DrandInstance) Ping() bool {
|
|
cl := dr.ctrl()
|
|
if err := cl.Ping(); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (dr *DrandInstance) Close() error {
|
|
dr.gossipRelay.Shutdown()
|
|
dr.daemon.Stop(context.Background())
|
|
return os.RemoveAll(dr.stateDir)
|
|
}
|
|
|
|
func (dr *DrandInstance) ctrl() *dnet.ControlClient {
|
|
if dr.ctrlClient != nil {
|
|
return dr.ctrlClient
|
|
}
|
|
cl, err := dnet.NewControlClient(dr.ctrlAddr)
|
|
if err != nil {
|
|
dr.t.RecordMessage("drand can't instantiate control client: %w", err)
|
|
return nil
|
|
}
|
|
dr.ctrlClient = cl
|
|
return cl
|
|
}
|
|
|
|
func (dr *DrandInstance) RunDKG(nodes, thr int, timeout string, leader bool, leaderAddr string, beaconOffset int) *key.Group {
|
|
cl := dr.ctrl()
|
|
p := dr.t.DurationParam("drand_period")
|
|
catchupPeriod := dr.t.DurationParam("drand_catchup_period")
|
|
t, _ := time.ParseDuration(timeout)
|
|
var grp *drand.GroupPacket
|
|
var err error
|
|
if leader {
|
|
grp, err = cl.InitDKGLeader(nodes, thr, p, catchupPeriod, t, nil, secretDKG, beaconOffset)
|
|
} else {
|
|
leader := dnet.CreatePeer(leaderAddr, false)
|
|
grp, err = cl.InitDKG(leader, nil, secretDKG)
|
|
}
|
|
if err != nil {
|
|
dr.t.RecordMessage("drand dkg run failed: %w", err)
|
|
return nil
|
|
}
|
|
kg, _ := key.GroupFromProto(grp)
|
|
return kg
|
|
}
|
|
|
|
func (dr *DrandInstance) Halt() {
|
|
dr.t.RecordMessage("drand node #%d halting", dr.t.GroupSeq)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
dr.daemon.Stop(ctx)
|
|
}
|
|
|
|
func (dr *DrandInstance) Resume() {
|
|
dr.t.RecordMessage("drand node #%d resuming", dr.t.GroupSeq)
|
|
dr.Start()
|
|
// block until we can fetch the round corresponding to the current time
|
|
startTime := time.Now()
|
|
round := dr.httpClient.RoundAt(startTime)
|
|
timeout := 120 * time.Second
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
done := make(chan struct{}, 1)
|
|
go func() {
|
|
for {
|
|
res, err := dr.httpClient.Get(ctx, round)
|
|
if err == nil {
|
|
dr.t.RecordMessage("drand chain caught up to round %d", res.Round())
|
|
done <- struct{}{}
|
|
return
|
|
}
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
dr.t.RecordMessage("drand chain failed to catch up after %s", timeout.String())
|
|
case <-done:
|
|
dr.t.RecordMessage("drand chain resumed after %s catchup time", time.Since(startTime))
|
|
}
|
|
}
|
|
|
|
func (dr *DrandInstance) RunDefault() error {
|
|
dr.t.RecordMessage("running drand node")
|
|
|
|
if dr.t.IsParamSet("suspend_events") {
|
|
suspender := statemachine.NewSuspender(dr, dr.t.RecordMessage)
|
|
suspender.RunEvents(dr.t.StringParam("suspend_events"))
|
|
}
|
|
|
|
dr.t.WaitUntilAllDone()
|
|
return nil
|
|
}
|
|
|
|
// prepareDrandNode starts a drand instance and runs a DKG with the other members of the composition group.
|
|
// Once the chain is running, the leader publishes the chain info needed by lotus nodes on
|
|
// drandConfigTopic
|
|
func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), PrepareDrandTimeout)
|
|
defer cancel()
|
|
|
|
ApplyNetworkParameters(t)
|
|
|
|
startTime := time.Now()
|
|
|
|
seq := t.GroupSeq
|
|
isLeader := seq == 1
|
|
nNodes := t.TestGroupInstanceCount
|
|
|
|
myAddr := t.NetClient.MustGetDataNetworkIP()
|
|
threshold := t.IntParam("drand_threshold")
|
|
runGossipRelay := t.BooleanParam("drand_gossip_relay")
|
|
|
|
beaconOffset := 3
|
|
|
|
stateDir, err := ioutil.TempDir("/tmp", fmt.Sprintf("drand-%d", t.GroupSeq))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dr := DrandInstance{
|
|
t: t,
|
|
stateDir: stateDir,
|
|
pubAddr: dtest.FreeBind(myAddr.String()),
|
|
privAddr: dtest.FreeBind(myAddr.String()),
|
|
ctrlAddr: dtest.FreeBind("localhost"),
|
|
}
|
|
dr.priv = key.NewKeyPair(dr.privAddr)
|
|
|
|
// share the node addresses with other nodes
|
|
// TODO: if we implement TLS, this is where we'd share public TLS keys
|
|
type NodeAddr struct {
|
|
PrivateAddr string
|
|
PublicAddr string
|
|
IsLeader bool
|
|
}
|
|
addrTopic := sync.NewTopic("drand-addrs", &NodeAddr{})
|
|
var publicAddrs []string
|
|
var leaderAddr string
|
|
ch := make(chan *NodeAddr)
|
|
_, sub := t.SyncClient.MustPublishSubscribe(ctx, addrTopic, &NodeAddr{
|
|
PrivateAddr: dr.privAddr,
|
|
PublicAddr: dr.pubAddr,
|
|
IsLeader: isLeader,
|
|
}, ch)
|
|
for i := 0; i < nNodes; i++ {
|
|
select {
|
|
case msg := <-ch:
|
|
publicAddrs = append(publicAddrs, fmt.Sprintf("http://%s", msg.PublicAddr))
|
|
if msg.IsLeader {
|
|
leaderAddr = msg.PrivateAddr
|
|
}
|
|
case err := <-sub.Done():
|
|
return nil, fmt.Errorf("unable to read drand addrs from sync service: %w", err)
|
|
}
|
|
}
|
|
if leaderAddr == "" {
|
|
return nil, fmt.Errorf("got %d drand addrs, but no leader", len(publicAddrs))
|
|
}
|
|
|
|
t.SyncClient.MustSignalAndWait(ctx, "drand-start", nNodes)
|
|
t.RecordMessage("Starting drand sharing ceremony")
|
|
if err := dr.Start(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
alive := false
|
|
waitSecs := 10
|
|
for i := 0; i < waitSecs; i++ {
|
|
if !dr.Ping() {
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
t.R().RecordPoint("drand_first_ping", time.Now().Sub(startTime).Seconds())
|
|
alive = true
|
|
break
|
|
}
|
|
if !alive {
|
|
return nil, fmt.Errorf("drand node %d failed to start after %d seconds", t.GroupSeq, waitSecs)
|
|
}
|
|
|
|
// run DKG
|
|
t.SyncClient.MustSignalAndWait(ctx, "drand-dkg-start", nNodes)
|
|
if !isLeader {
|
|
time.Sleep(3 * time.Second)
|
|
}
|
|
grp := dr.RunDKG(nNodes, threshold, "10s", isLeader, leaderAddr, beaconOffset)
|
|
if grp == nil {
|
|
return nil, fmt.Errorf("drand dkg failed")
|
|
}
|
|
t.R().RecordPoint("drand_dkg_complete", time.Now().Sub(startTime).Seconds())
|
|
|
|
t.RecordMessage("drand dkg complete, waiting for chain start: %v", time.Until(time.Unix(grp.GenesisTime, 0).Add(grp.Period)))
|
|
|
|
// wait for chain to begin
|
|
to := time.Until(time.Unix(grp.GenesisTime, 0).Add(5 * time.Second).Add(grp.Period))
|
|
time.Sleep(to)
|
|
|
|
t.RecordMessage("drand beacon chain started, fetching initial round via http")
|
|
// verify that we can get a round of randomness from the chain using an http client
|
|
info := chain.NewChainInfo(grp)
|
|
myPublicAddr := fmt.Sprintf("http://%s", dr.pubAddr)
|
|
dr.httpClient, err = hclient.NewWithInfo(myPublicAddr, info, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create drand http client: %w", err)
|
|
}
|
|
|
|
_, err = dr.httpClient.Get(ctx, 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to get initial drand round: %w", err)
|
|
}
|
|
|
|
// start gossip relay (unless disabled via testplan parameter)
|
|
var relayAddrs []peer.AddrInfo
|
|
|
|
if runGossipRelay {
|
|
gossipDir := path.Join(stateDir, "gossip-relay")
|
|
listenAddr := fmt.Sprintf("/ip4/%s/tcp/7777", myAddr.String())
|
|
relayCfg := lp2p.GossipRelayConfig{
|
|
ChainHash: hex.EncodeToString(info.Hash()),
|
|
Addr: listenAddr,
|
|
DataDir: gossipDir,
|
|
IdentityPath: path.Join(gossipDir, "identity.key"),
|
|
Insecure: true,
|
|
Client: dr.httpClient,
|
|
}
|
|
t.RecordMessage("starting drand gossip relay")
|
|
dr.gossipRelay, err = lp2p.NewGossipRelayNode(log.NewLogger(nil, getLogLevel(t)), &relayCfg)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to construct drand gossip relay: %w", err)
|
|
}
|
|
|
|
t.RecordMessage("sharing gossip relay addrs")
|
|
// share the gossip relay addrs so we can publish them in DrandRuntimeInfo
|
|
relayInfo, err := relayAddrInfo(dr.gossipRelay.Multiaddrs(), myAddr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
infoCh := make(chan *peer.AddrInfo, nNodes)
|
|
infoTopic := sync.NewTopic("drand-gossip-addrs", &peer.AddrInfo{})
|
|
|
|
_, sub := t.SyncClient.MustPublishSubscribe(ctx, infoTopic, relayInfo, infoCh)
|
|
for i := 0; i < nNodes; i++ {
|
|
select {
|
|
case ai := <-infoCh:
|
|
relayAddrs = append(relayAddrs, *ai)
|
|
case err := <-sub.Done():
|
|
return nil, fmt.Errorf("unable to get drand relay addr from sync service: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// if we're the leader, publish the config to the sync service
|
|
if isLeader {
|
|
buf := bytes.Buffer{}
|
|
if err := info.ToJSON(&buf); err != nil {
|
|
return nil, fmt.Errorf("error marshaling chain info: %w", err)
|
|
}
|
|
cfg := DrandRuntimeInfo{
|
|
Config: dtypes.DrandConfig{
|
|
Servers: publicAddrs,
|
|
ChainInfoJSON: buf.String(),
|
|
},
|
|
GossipBootstrap: relayAddrs,
|
|
}
|
|
t.DebugSpew("publishing drand config on sync topic: %v", cfg)
|
|
t.SyncClient.MustPublish(ctx, DrandConfigTopic, &cfg)
|
|
}
|
|
|
|
// signal ready state
|
|
t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
|
|
return &dr, nil
|
|
}
|
|
|
|
// waitForDrandConfig should be called by filecoin instances before constructing the lotus Node
|
|
// you can use the returned dtypes.DrandConfig to override the default production config.
|
|
func waitForDrandConfig(ctx context.Context, client sync.Client) (*DrandRuntimeInfo, error) {
|
|
ch := make(chan *DrandRuntimeInfo, 1)
|
|
sub := client.MustSubscribe(ctx, DrandConfigTopic, ch)
|
|
select {
|
|
case cfg := <-ch:
|
|
return cfg, nil
|
|
case err := <-sub.Done():
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func relayAddrInfo(addrs []ma.Multiaddr, dataIP net.IP) (*peer.AddrInfo, error) {
|
|
for _, a := range addrs {
|
|
if ip, _ := a.ValueForProtocol(ma.P_IP4); ip != dataIP.String() {
|
|
continue
|
|
}
|
|
return peer.AddrInfoFromP2pAddr(a)
|
|
}
|
|
return nil, fmt.Errorf("no addr found with data ip %s in addrs: %v", dataIP, addrs)
|
|
}
|
|
|
|
func getLogLevel(t *TestEnvironment) int {
|
|
switch t.StringParam("drand_log_level") {
|
|
case "info":
|
|
return log.LogInfo
|
|
case "debug":
|
|
return log.LogDebug
|
|
default:
|
|
return log.LogNone
|
|
}
|
|
}
|