diff --git a/lotus-soup/testkit/role_drand.go b/lotus-soup/testkit/role_drand.go index ec285b9f9..0376b4132 100644 --- a/lotus-soup/testkit/role_drand.go +++ b/lotus-soup/testkit/role_drand.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/hex" - "encoding/json" "fmt" "io/ioutil" "net" @@ -13,70 +12,198 @@ import ( "time" "github.com/drand/drand/chain" + "github.com/drand/drand/client" hclient "github.com/drand/drand/client/http" - "github.com/drand/drand/demo/node" + "github.com/drand/drand/core" + "github.com/drand/drand/key" "github.com/drand/drand/log" "github.com/drand/drand/lp2p" + dnet "github.com/drand/drand/net" + "github.com/drand/drand/protobuf/drand" + dtest "github.com/drand/drand/test" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" "github.com/testground/sdk-go/sync" + + "github.com/filecoin-project/oni/lotus-soup/statemachine" ) -var PrepareDrandTimeout = time.Minute +var ( + PrepareDrandTimeout = time.Minute + secretDKG = "dkgsecret" +) type DrandInstance struct { - t *TestEnvironment - - Node node.Node - GossipRelay *lp2p.GossipRelayNode + daemon *core.Drand + httpClient client.Client + ctrlClient *dnet.ControlClient + gossipRelay *lp2p.GossipRelayNode + t *TestEnvironment stateDir string + priv *key.Pair + pubAddr string + privAddr string + ctrlAddr string } -func (d *DrandInstance) Cleanup() error { - return os.RemoveAll(d.stateDir) -} - -func (d *DrandInstance) RunDefault() error { - d.t.RecordMessage("running drand node") - defer d.Cleanup() - - // TODO add ability to halt / recover on demand - d.t.WaitUntilAllDone() +func (dr *DrandInstance) Start() error { + opts := []core.ConfigOption{ + core.WithLogLevel(getLogLevel(dr.t)), + core.WithConfigFolder(dr.stateDir), + core.WithPublicListenAddress(dr.pubAddr), + core.WithPrivateListenAddress(dr.privAddr), + core.WithControlPort(dr.ctrlAddr), + core.WithInsecure(), + } + conf := core.NewConfig(opts...) + fs := key.NewFileStore(conf.ConfigFolder()) + fs.SaveKeyPair(dr.priv) + key.Save(path.Join(dr.stateDir, "public.toml"), dr.priv.Public, false) + if dr.daemon == nil { + drand, err := core.NewDrand(fs, conf) + if err != nil { + return err + } + dr.daemon = drand + } else { + drand, err := core.LoadDrand(fs, conf) + if err != nil { + return err + } + drand.StartBeacon(true) + dr.daemon = drand + } return nil } -// PrepareDrandInstance starts a drand instance and runs a DKG with the other -// members of the composition group. -// -// Once the chain is running, the leader publishes the chain info needed by -// lotus nodes on DrandConfigTopic. +func (dr *DrandInstance) Ping() bool { + cl := dr.ctrl() + if err := cl.Ping(); err != nil { + return false + } + return true +} + +func (dr *DrandInstance) Close() error { + dr.gossipRelay.Shutdown() + dr.daemon.Stop(context.Background()) + return os.RemoveAll(dr.stateDir) +} + +func (dr *DrandInstance) ctrl() *dnet.ControlClient { + if dr.ctrlClient != nil { + return dr.ctrlClient + } + cl, err := dnet.NewControlClient(dr.ctrlAddr) + if err != nil { + dr.t.RecordMessage("drand can't instantiate control client: %w", err) + return nil + } + dr.ctrlClient = cl + return cl +} + +func (dr *DrandInstance) RunDKG(nodes, thr int, timeout string, leader bool, leaderAddr string, beaconOffset int) *key.Group { + cl := dr.ctrl() + p := dr.t.DurationParam("drand_period") + t, _ := time.ParseDuration(timeout) + var grp *drand.GroupPacket + var err error + if leader { + grp, err = cl.InitDKGLeader(nodes, thr, p, t, nil, secretDKG, beaconOffset) + } else { + leader := dnet.CreatePeer(leaderAddr, false) + grp, err = cl.InitDKG(leader, nil, secretDKG) + } + if err != nil { + dr.t.RecordMessage("drand dkg run failed: %w", err) + return nil + } + kg, _ := key.GroupFromProto(grp) + return kg +} + +func (dr *DrandInstance) Halt() { + dr.t.RecordMessage("drand node #%d halting", dr.t.GroupSeq) + dr.daemon.StopBeacon() +} + +func (dr *DrandInstance) Resume() { + dr.t.RecordMessage("drand node #%d resuming", dr.t.GroupSeq) + dr.daemon.StartBeacon(true) + // block until we can fetch the round corresponding to the current time + startTime := time.Now() + round := dr.httpClient.RoundAt(startTime) + timeout := 30 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + done := make(chan struct{}, 1) + go func() { + for { + res, err := dr.httpClient.Get(ctx, round) + if err == nil { + dr.t.RecordMessage("drand chain caught up to round %d", res.Round()) + done <- struct{}{} + return + } + } + }() + + select { + case <-ctx.Done(): + dr.t.RecordMessage("drand chain failed to catch up after %s", timeout.String()) + case <-done: + dr.t.RecordMessage("drand chain resumed after %s catchup time", time.Since(startTime)) + } +} + +func (dr *DrandInstance) RunDefault() error { + dr.t.RecordMessage("running drand node") + + if dr.t.IsParamSet("suspend_events") { + suspender := statemachine.NewSuspender(dr, dr.t.RecordMessage) + suspender.RunEvents(dr.t.StringParam("suspend_events")) + } + + dr.t.WaitUntilAllDone() + return nil +} + +// prepareDrandNode starts a drand instance and runs a DKG with the other members of the composition group. +// Once the chain is running, the leader publishes the chain info needed by lotus nodes on +// drandConfigTopic func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) { - var ( - startTime = time.Now() - seq = t.GroupSeq - isLeader = seq == 1 - nNodes = t.TestGroupInstanceCount - - myAddr = t.NetClient.MustGetDataNetworkIP() - period = t.DurationParam("drand_period") - threshold = t.IntParam("drand_threshold") - runGossipRelay = t.BooleanParam("drand_gossip_relay") - - beaconOffset = 3 - ) - ctx, cancel := context.WithTimeout(context.Background(), PrepareDrandTimeout) defer cancel() - stateDir, err := ioutil.TempDir("", fmt.Sprintf("drand-%d", t.GroupSeq)) + startTime := time.Now() + + seq := t.GroupSeq + isLeader := seq == 1 + nNodes := t.TestGroupInstanceCount + + myAddr := t.NetClient.MustGetDataNetworkIP() + threshold := t.IntParam("drand_threshold") + runGossipRelay := t.BooleanParam("drand_gossip_relay") + + beaconOffset := 3 + + stateDir, err := ioutil.TempDir("/tmp", fmt.Sprintf("drand-%d", t.GroupSeq)) if err != nil { return nil, err } - // TODO(maybe): use TLS? - n := node.NewLocalNode(int(seq), period.String(), stateDir, false, myAddr.String()) + dr := DrandInstance{ + t: t, + stateDir: stateDir, + pubAddr: dtest.FreeBind(myAddr.String()), + privAddr: dtest.FreeBind(myAddr.String()), + ctrlAddr: dtest.FreeBind("localhost"), + } + dr.priv = key.NewKeyPair(dr.privAddr) // share the node addresses with other nodes // TODO: if we implement TLS, this is where we'd share public TLS keys @@ -85,17 +212,15 @@ func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) { PublicAddr string IsLeader bool } - addrTopic := sync.NewTopic("drand-addrs", &NodeAddr{}) var publicAddrs []string var leaderAddr string ch := make(chan *NodeAddr) _, sub := t.SyncClient.MustPublishSubscribe(ctx, addrTopic, &NodeAddr{ - PrivateAddr: n.PrivateAddr(), - PublicAddr: n.PublicAddr(), + PrivateAddr: dr.privAddr, + PublicAddr: dr.pubAddr, IsLeader: isLeader, }, ch) - for i := 0; i < nNodes; i++ { select { case msg := <-ch: @@ -107,21 +232,20 @@ func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) { return nil, fmt.Errorf("unable to read drand addrs from sync service: %w", err) } } - if leaderAddr == "" { return nil, fmt.Errorf("got %d drand addrs, but no leader", len(publicAddrs)) } t.SyncClient.MustSignalAndWait(ctx, "drand-start", nNodes) t.RecordMessage("Starting drand sharing ceremony") - if err := n.Start(stateDir); err != nil { + if err := dr.Start(); err != nil { return nil, err } alive := false waitSecs := 10 for i := 0; i < waitSecs; i++ { - if !n.Ping() { + if !dr.Ping() { time.Sleep(time.Second) continue } @@ -138,7 +262,7 @@ func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) { if !isLeader { time.Sleep(time.Second) } - grp := n.RunDKG(nNodes, threshold, period.String(), isLeader, leaderAddr, beaconOffset) + grp := dr.RunDKG(nNodes, threshold, "10s", isLeader, leaderAddr, beaconOffset) if grp == nil { return nil, fmt.Errorf("drand dkg failed") } @@ -152,19 +276,18 @@ func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) { t.RecordMessage("drand beacon chain started, fetching initial round via http") // verify that we can get a round of randomness from the chain using an http client info := chain.NewChainInfo(grp) - myPublicAddr := fmt.Sprintf("http://%s", n.PublicAddr()) - client, err := hclient.NewWithInfo(myPublicAddr, info, nil) + myPublicAddr := fmt.Sprintf("http://%s", dr.pubAddr) + dr.httpClient, err = hclient.NewWithInfo(myPublicAddr, info, nil) if err != nil { return nil, fmt.Errorf("unable to create drand http client: %w", err) } - _, err = client.Get(ctx, 1) + _, err = dr.httpClient.Get(ctx, 1) if err != nil { return nil, fmt.Errorf("unable to get initial drand round: %w", err) } // start gossip relay (unless disabled via testplan parameter) - var gossipRelay *lp2p.GossipRelayNode var relayAddrs []peer.AddrInfo if runGossipRelay { @@ -176,17 +299,17 @@ func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) { DataDir: gossipDir, IdentityPath: path.Join(gossipDir, "identity.key"), Insecure: true, - Client: client, + Client: dr.httpClient, } t.RecordMessage("starting drand gossip relay") - gossipRelay, err = lp2p.NewGossipRelayNode(log.DefaultLogger, &relayCfg) + dr.gossipRelay, err = lp2p.NewGossipRelayNode(log.NewLogger(getLogLevel(t)), &relayCfg) if err != nil { return nil, fmt.Errorf("failed to construct drand gossip relay: %w", err) } t.RecordMessage("sharing gossip relay addrs") // share the gossip relay addrs so we can publish them in DrandRuntimeInfo - relayInfo, err := relayAddrInfo(gossipRelay.Multiaddrs(), myAddr) + relayInfo, err := relayAddrInfo(dr.gossipRelay.Multiaddrs(), myAddr) if err != nil { return nil, err } @@ -217,17 +340,13 @@ func PrepareDrandInstance(t *TestEnvironment) (*DrandInstance, error) { }, GossipBootstrap: relayAddrs, } - dump, _ := json.Marshal(cfg) - t.RecordMessage("publishing drand config on sync topic: %s", string(dump)) + t.DebugSpew("publishing drand config on sync topic: %v", cfg) t.SyncClient.MustPublish(ctx, DrandConfigTopic, &cfg) } - return &DrandInstance{ - t: t, - Node: n, - GossipRelay: gossipRelay, - stateDir: stateDir, - }, nil + // signal ready state + t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount) + return &dr, nil } // waitForDrandConfig should be called by filecoin instances before constructing the lotus Node @@ -252,3 +371,14 @@ func relayAddrInfo(addrs []ma.Multiaddr, dataIP net.IP) (*peer.AddrInfo, error) } return nil, fmt.Errorf("no addr found with data ip %s in addrs: %v", dataIP, addrs) } + +func getLogLevel(t *TestEnvironment) int { + switch t.StringParam("drand_log_level") { + case "info": + return log.LogInfo + case "debug": + return log.LogDebug + default: + return log.LogNone + } +}