refactor drand node and implement halting

This commit is contained in:
Yusef Napora 2020-06-25 17:21:49 -04:00
parent 8469f67a38
commit 0b386f21d5
3 changed files with 220 additions and 23 deletions

View File

@ -0,0 +1,76 @@
[metadata]
name = "lotus-soup"
author = ""
[global]
plan = "lotus-soup"
case = "lotus-baseline"
total_instances = 6
builder = "docker:go"
runner = "local:docker"
[[groups]]
id = "bootstrapper"
[groups.resources]
memory = "120Mi"
cpu = "10m"
[groups.instances]
count = 1
percentage = 0.0
[groups.run]
[groups.run.test_params]
role = "bootstrapper"
clients = "1"
miners = "1"
balance = "2000"
sectors = "10"
random_beacon_type = "local-drand"
[[groups]]
id = "miners"
[groups.resources]
memory = "120Mi"
cpu = "10m"
[groups.instances]
count = 1
percentage = 0.0
[groups.run]
[groups.run.test_params]
role = "miner"
clients = "1"
miners = "1"
balance = "2000"
sectors = "10"
random_beacon_type = "local-drand"
[[groups]]
id = "clients"
[groups.resources]
memory = "120Mi"
cpu = "10m"
[groups.instances]
count = 1
percentage = 0.0
[groups.run]
[groups.run.test_params]
role = "client"
clients = "1"
miners = "1"
balance = "2000"
sectors = "10"
random_beacon_type = "local-drand"
[[groups]]
id = "drand"
[groups.instances]
count = 3
percentage = 0.0
[groups.run]
[groups.run.test_params]
role = "drand"
drand_period = "1s"
drand_halt_duration = "20s"
drand_halt_begin = "10s"
drand_log_level = "info"

View File

@ -38,6 +38,9 @@ instances = { min = 1, max = 100, default = 5 }
drand_period = { type = "duration", default="10s" } drand_period = { type = "duration", default="10s" }
drand_threshold = { type = "int", default = 2 } drand_threshold = { type = "int", default = 2 }
drand_gossip_relay = { type = "bool", default = true } drand_gossip_relay = { type = "bool", default = true }
drand_halt_duration = { type = "duration", default="0", desc = "how long to halt drand chain before resuming" }
drand_halt_begin = { type = "duration", default="1m", desc = "when to start drand halting (relative to test start time)"}
drand_log_level = { type = "string", default="info" }
# Params relevant to pubsub tracing # Params relevant to pubsub tracing
enable_pubsub_tracer = { type = "bool", default = false } enable_pubsub_tracer = { type = "bool", default = false }

View File

@ -14,26 +14,122 @@ import (
"github.com/drand/drand/chain" "github.com/drand/drand/chain"
hclient "github.com/drand/drand/client/http" hclient "github.com/drand/drand/client/http"
"github.com/drand/drand/demo/node" "github.com/drand/drand/core"
"github.com/drand/drand/key"
"github.com/drand/drand/log" "github.com/drand/drand/log"
"github.com/drand/drand/lp2p" "github.com/drand/drand/lp2p"
dnet "github.com/drand/drand/net"
"github.com/drand/drand/protobuf/drand"
dtest "github.com/drand/drand/test"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/testground/sdk-go/sync" "github.com/testground/sdk-go/sync"
) )
var PrepareDrandTimeout = time.Minute var (
PrepareDrandTimeout = time.Minute
secretDKG = "dkgsecret"
)
type DrandInstance struct { type DrandInstance struct {
Node node.Node daemon *core.Drand
GossipRelay *lp2p.GossipRelayNode ctrlClient *dnet.ControlClient
gossipRelay *lp2p.GossipRelayNode
t *TestEnvironment
stateDir string stateDir string
priv *key.Pair
pubAddr string
privAddr string
ctrlAddr string
} }
func (d *DrandInstance) Cleanup() error { func (dr *DrandInstance) Start() error {
return os.RemoveAll(d.stateDir) opts := []core.ConfigOption{
core.WithLogLevel(getLogLevel(dr.t)),
core.WithConfigFolder(dr.stateDir),
core.WithPublicListenAddress(dr.pubAddr),
core.WithPrivateListenAddress(dr.privAddr),
core.WithControlPort(dr.ctrlAddr),
core.WithInsecure(),
}
conf := core.NewConfig(opts...)
fs := key.NewFileStore(conf.ConfigFolder())
fs.SaveKeyPair(dr.priv)
key.Save(path.Join(dr.stateDir, "public.toml"), dr.priv.Public, false)
if dr.daemon == nil {
drand, err := core.NewDrand(fs, conf)
if err != nil {
return err
}
dr.daemon = drand
} else {
drand, err := core.LoadDrand(fs, conf)
if err != nil {
return err
}
drand.StartBeacon(true)
dr.daemon = drand
}
return nil
}
func (dr *DrandInstance) Ping() bool {
cl := dr.ctrl()
if err := cl.Ping(); err != nil {
return false
}
return true
}
func (dr *DrandInstance) Close() error {
dr.gossipRelay.Shutdown()
dr.daemon.Stop(context.Background())
return os.RemoveAll(dr.stateDir)
}
func (dr *DrandInstance) ctrl() *dnet.ControlClient {
if dr.ctrlClient != nil {
return dr.ctrlClient
}
cl, err := dnet.NewControlClient(dr.ctrlAddr)
if err != nil {
dr.t.RecordMessage("drand can't instantiate control client: %w", err)
return nil
}
dr.ctrlClient = cl
return cl
}
func (dr *DrandInstance) RunDKG(nodes, thr int, timeout string, leader bool, leaderAddr string, beaconOffset int) *key.Group {
cl := dr.ctrl()
p := dr.t.DurationParam("drand_period")
t, _ := time.ParseDuration(timeout)
var grp *drand.GroupPacket
var err error
if leader {
grp, err = cl.InitDKGLeader(nodes, thr, p, t, nil, secretDKG, beaconOffset)
} else {
leader := dnet.CreatePeer(leaderAddr, false)
grp, err = cl.InitDKG(leader, nil, secretDKG)
}
if err != nil {
dr.t.RecordMessage("drand dkg run failed: %w", err)
return nil
}
kg, _ := key.GroupFromProto(grp)
return kg
}
func (dr *DrandInstance) Halt(duration time.Duration) {
dr.t.RecordMessage("drand node %d halting for %s", dr.t.GroupSeq, duration.String())
dr.daemon.StopBeacon()
time.AfterFunc(duration, func() {
dr.t.RecordMessage("drand node %d coming back online", dr.t.GroupSeq)
dr.daemon.StartBeacon(true)
})
} }
func runDrandNode(t *TestEnvironment) error { func runDrandNode(t *TestEnvironment) error {
@ -42,10 +138,20 @@ func runDrandNode(t *TestEnvironment) error {
if err != nil { if err != nil {
return err return err
} }
defer dr.Cleanup() defer dr.Close()
// TODO add ability to halt / recover on demand // TODO add ability to halt / recover on demand
ctx := context.Background() ctx := context.Background()
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
haltDuration := t.DurationParam("drand_halt_duration")
if haltDuration != 0 {
startTime := t.DurationParam("drand_halt_begin")
time.AfterFunc(startTime, func() {
dr.Halt(haltDuration)
})
}
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)
return nil return nil
} }
@ -64,19 +170,24 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
nNodes := t.TestGroupInstanceCount nNodes := t.TestGroupInstanceCount
myAddr := t.NetClient.MustGetDataNetworkIP() myAddr := t.NetClient.MustGetDataNetworkIP()
period := t.DurationParam("drand_period")
threshold := t.IntParam("drand_threshold") threshold := t.IntParam("drand_threshold")
runGossipRelay := t.BooleanParam("drand_gossip_relay") runGossipRelay := t.BooleanParam("drand_gossip_relay")
beaconOffset := 3 beaconOffset := 3
stateDir, err := ioutil.TempDir("", fmt.Sprintf("drand-%d", t.GroupSeq)) stateDir, err := ioutil.TempDir("/tmp", fmt.Sprintf("drand-%d", t.GroupSeq))
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO(maybe): use TLS? dr := DrandInstance{
n := node.NewLocalNode(int(seq), period.String(), stateDir, false, myAddr.String()) t: t,
stateDir: stateDir,
pubAddr: dtest.FreeBind(myAddr.String()),
privAddr: dtest.FreeBind(myAddr.String()),
ctrlAddr: dtest.FreeBind("localhost"),
}
dr.priv = key.NewKeyPair(dr.privAddr)
// share the node addresses with other nodes // share the node addresses with other nodes
// TODO: if we implement TLS, this is where we'd share public TLS keys // TODO: if we implement TLS, this is where we'd share public TLS keys
@ -90,8 +201,8 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
var leaderAddr string var leaderAddr string
ch := make(chan *NodeAddr) ch := make(chan *NodeAddr)
_, sub := t.SyncClient.MustPublishSubscribe(ctx, addrTopic, &NodeAddr{ _, sub := t.SyncClient.MustPublishSubscribe(ctx, addrTopic, &NodeAddr{
PrivateAddr: n.PrivateAddr(), PrivateAddr: dr.privAddr,
PublicAddr: n.PublicAddr(), PublicAddr: dr.pubAddr,
IsLeader: isLeader, IsLeader: isLeader,
}, ch) }, ch)
for i := 0; i < nNodes; i++ { for i := 0; i < nNodes; i++ {
@ -111,14 +222,14 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
t.SyncClient.MustSignalAndWait(ctx, "drand-start", nNodes) t.SyncClient.MustSignalAndWait(ctx, "drand-start", nNodes)
t.RecordMessage("Starting drand sharing ceremony") t.RecordMessage("Starting drand sharing ceremony")
if err := n.Start(stateDir); err != nil { if err := dr.Start(); err != nil {
return nil, err return nil, err
} }
alive := false alive := false
waitSecs := 10 waitSecs := 10
for i := 0; i < waitSecs; i++ { for i := 0; i < waitSecs; i++ {
if !n.Ping() { if !dr.Ping() {
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
@ -135,7 +246,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
if !isLeader { if !isLeader {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
grp := n.RunDKG(nNodes, threshold, period.String(), isLeader, leaderAddr, beaconOffset) grp := dr.RunDKG(nNodes, threshold, "10s", isLeader, leaderAddr, beaconOffset)
if grp == nil { if grp == nil {
return nil, fmt.Errorf("drand dkg failed") return nil, fmt.Errorf("drand dkg failed")
} }
@ -149,7 +260,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
t.RecordMessage("drand beacon chain started, fetching initial round via http") t.RecordMessage("drand beacon chain started, fetching initial round via http")
// verify that we can get a round of randomness from the chain using an http client // verify that we can get a round of randomness from the chain using an http client
info := chain.NewChainInfo(grp) info := chain.NewChainInfo(grp)
myPublicAddr := fmt.Sprintf("http://%s", n.PublicAddr()) myPublicAddr := fmt.Sprintf("http://%s", dr.pubAddr)
client, err := hclient.NewWithInfo(myPublicAddr, info, nil) client, err := hclient.NewWithInfo(myPublicAddr, info, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create drand http client: %w", err) return nil, fmt.Errorf("unable to create drand http client: %w", err)
@ -176,7 +287,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
Client: client, Client: client,
} }
t.RecordMessage("starting drand gossip relay") t.RecordMessage("starting drand gossip relay")
gossipRelay, err = lp2p.NewGossipRelayNode(log.DefaultLogger, &relayCfg) gossipRelay, err = lp2p.NewGossipRelayNode(log.NewLogger(getLogLevel(t)), &relayCfg)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to construct drand gossip relay: %w", err) return nil, fmt.Errorf("failed to construct drand gossip relay: %w", err)
} }
@ -219,11 +330,7 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) {
t.SyncClient.MustPublish(ctx, drandConfigTopic, &cfg) t.SyncClient.MustPublish(ctx, drandConfigTopic, &cfg)
} }
return &DrandInstance{ return &dr, nil
Node: n,
GossipRelay: gossipRelay,
stateDir: stateDir,
}, nil
} }
// waitForDrandConfig should be called by filecoin instances before constructing the lotus Node // waitForDrandConfig should be called by filecoin instances before constructing the lotus Node
@ -248,3 +355,14 @@ func relayAddrInfo(addrs []ma.Multiaddr, dataIP net.IP) (*peer.AddrInfo, error)
} }
return nil, fmt.Errorf("no addr found with data ip %s in addrs: %v", dataIP, addrs) return nil, fmt.Errorf("no addr found with data ip %s in addrs: %v", dataIP, addrs)
} }
func getLogLevel(t *TestEnvironment) int {
switch t.StringParam("drand_log_level") {
case "info":
return log.LogInfo
case "debug":
return log.LogDebug
default:
return log.LogNone
}
}