diff --git a/lotus-soup/compositions/composition-paych-stress.toml b/lotus-soup/compositions/composition-paych-stress.toml new file mode 100644 index 000000000..db0f7411c --- /dev/null +++ b/lotus-soup/compositions/composition-paych-stress.toml @@ -0,0 +1,49 @@ +[metadata] + name = "lotus-soup" + author = "raulk" + +[global] + plan = "lotus-soup" + case = "paych-stress" + total_instances = 5 # 2 clients + 2 miners + 1 bootstrapper + builder = "exec:go" + runner = "local:exec" + +[global.build_config] + enable_go_build_cache = true + +[global.run_config] + exposed_ports = ["6060", "1234", "2345"] + +[global.build] + selectors = ["testground"] + +[global.run.test_params] + clients = "2" + miners = "2" + genesis_timestamp_offset = "0" + balance = "30" ## be careful, this is in FIL. + sectors = "10" + random_beacon_type = "mock" + mining_mode = "natural" + lane_count = "8" + increments = "30" ## in FIL + +[[groups]] + id = "bootstrapper" + instances = { count = 1 } + [groups.run.test_params] + role = "bootstrapper" + +[[groups]] + id = "miners" + instances = { count = 2 } + [groups.run.test_params] + role = "miner" + +[[groups]] + id = "clients" + # the first client will be on the receiving end; all others will be on the sending end. + instances = { count = 2 } + [groups.run.test_params] + role = "client" diff --git a/lotus-soup/compositions/composition.toml b/lotus-soup/compositions/composition.toml index c886ffa74..2ad37945c 100644 --- a/lotus-soup/compositions/composition.toml +++ b/lotus-soup/compositions/composition.toml @@ -22,7 +22,7 @@ clients = "3" miners = "2" genesis_timestamp_offset = "100000" - balance = "2000000000" + balance = "2000000000" ## be careful, this is in FIL. sectors = "10" random_beacon_type = "mock" diff --git a/lotus-soup/deals_e2e.go b/lotus-soup/deals_e2e.go index 74a2f6c14..e968d9145 100644 --- a/lotus-soup/deals_e2e.go +++ b/lotus-soup/deals_e2e.go @@ -50,12 +50,12 @@ func dealsE2E(t *testkit.TestEnvironment) error { // select a random miner minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))] - if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil { + if err := client.NetConnect(ctx, minerAddr.MinerNetAddrs); err != nil { return err } - t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1) + t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.MinerActorAddr)).Inc(1) - t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr) + t.RecordMessage("selected %s as the miner", minerAddr.MinerActorAddr) time.Sleep(2 * time.Second) @@ -82,7 +82,7 @@ func dealsE2E(t *testkit.TestEnvironment) error { // start deal t1 := time.Now() - deal := testkit.StartDeal(ctx, minerAddr.ActorAddr, client, fcid) + deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, fcid) t.RecordMessage("started deal: %s", deal) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this diff --git a/lotus-soup/deals_stress.go b/lotus-soup/deals_stress.go index 108fdd15a..7285ec25a 100644 --- a/lotus-soup/deals_stress.go +++ b/lotus-soup/deals_stress.go @@ -33,11 +33,11 @@ func dealStressTest(t *testkit.TestEnvironment) error { // select a random miner minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))] - if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil { + if err := client.NetConnect(ctx, minerAddr.MinerNetAddrs); err != nil { return err } - t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr) + t.RecordMessage("selected %s as the miner", minerAddr.MinerActorAddr) time.Sleep(2 * time.Second) @@ -92,12 +92,12 @@ func dealStressTest(t *testkit.TestEnvironment) error { go func(i int) { defer wg1.Done() t1 := time.Now() - deal := testkit.StartDeal(ctx, minerAddr.ActorAddr, client, cids[i]) + deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i]) t.RecordMessage("started storage deal %d -> %s", i, deal) time.Sleep(2 * time.Second) t.RecordMessage("waiting for deal %d to be sealed", i) testkit.WaitDealSealed(t, ctx, client, deal) - t.D().ResettingHistogram(fmt.Sprintf("deal.sealed,miner=%s", minerAddr.ActorAddr)).Update(int64(time.Since(t1))) + t.D().ResettingHistogram(fmt.Sprintf("deal.sealed,miner=%s", minerAddr.MinerActorAddr)).Update(int64(time.Since(t1))) }(i) } t.RecordMessage("waiting for all deals to be sealed") @@ -123,7 +123,7 @@ func dealStressTest(t *testkit.TestEnvironment) error { } else { for i := 0; i < deals; i++ { - deal := testkit.StartDeal(ctx, minerAddr.ActorAddr, client, cids[i]) + deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i]) t.RecordMessage("started storage deal %d -> %s", i, deal) time.Sleep(2 * time.Second) t.RecordMessage("waiting for deal %d to be sealed", i) diff --git a/lotus-soup/go.mod b/lotus-soup/go.mod index b8334ec5d..03e5b0f98 100644 --- a/lotus-soup/go.mod +++ b/lotus-soup/go.mod @@ -3,6 +3,7 @@ module github.com/filecoin-project/oni/lotus-soup go 1.14 require ( + contrib.go.opencensus.io/exporter/prometheus v0.1.0 github.com/davecgh/go-spew v1.1.1 github.com/drand/drand v0.9.2-0.20200616080806-a94e9c1636a4 github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef diff --git a/lotus-soup/go.sum b/lotus-soup/go.sum index 0695c1d55..015f0b9ea 100644 --- a/lotus-soup/go.sum +++ b/lotus-soup/go.sum @@ -27,6 +27,7 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= contrib.go.opencensus.io/exporter/jaeger v0.1.0/go.mod h1:VYianECmuFPwU37O699Vc1GOcy+y8kOsfaxHRImmjbA= +contrib.go.opencensus.io/exporter/prometheus v0.1.0 h1:SByaIoWwNgMdPSgl5sMqM2KDE5H/ukPWBRo314xiDvg= contrib.go.opencensus.io/exporter/prometheus v0.1.0/go.mod h1:cGFniUXGZlKRjzOyuZJ6mgB+PgBcCIa79kEKR8YCW+A= dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= @@ -245,10 +246,6 @@ github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIi github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8= -github.com/filecoin-project/lotus v0.4.2-0.20200706092412-516e31d37cd7 h1:eE3a712/0rcnml1lqwtGHYz9JrX4vLqBk9yBdYtH9Jo= -github.com/filecoin-project/lotus v0.4.2-0.20200706092412-516e31d37cd7/go.mod h1:uo3yDPhPlpHwdCKr0k41/a205WwlSclQamx+sQDKRMI= -github.com/filecoin-project/lotus v0.4.2-0.20200706153752-f8b65d391143 h1:03PUHBPtjNmxdPbOL258pXLVE4Dz3xjsY86THGV1UQQ= -github.com/filecoin-project/lotus v0.4.2-0.20200706153752-f8b65d391143/go.mod h1:uo3yDPhPlpHwdCKr0k41/a205WwlSclQamx+sQDKRMI= github.com/filecoin-project/lotus v0.4.2-0.20200706172415-cf6ac44b6ec5 h1:wsEkRwhcWvaZowowC2Kj9ueJ2vIRDqOxOcFvqqgHdxE= github.com/filecoin-project/lotus v0.4.2-0.20200706172415-cf6ac44b6ec5/go.mod h1:uo3yDPhPlpHwdCKr0k41/a205WwlSclQamx+sQDKRMI= github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM= diff --git a/lotus-soup/main.go b/lotus-soup/main.go index 3b36cf207..eb9dd4b09 100644 --- a/lotus-soup/main.go +++ b/lotus-soup/main.go @@ -1,22 +1,54 @@ package main import ( - "github.com/testground/sdk-go/run" + "os" + "github.com/filecoin-project/oni/lotus-soup/paych" "github.com/filecoin-project/oni/lotus-soup/testkit" "github.com/filecoin-project/lotus/build" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/specs-actors/actors/builtin/verifreg" + + "github.com/testground/sdk-go/run" + + logging "github.com/ipfs/go-log/v2" ) var cases = map[string]interface{}{ "deals-e2e": testkit.WrapTestEnvironment(dealsE2E), "deals-stress-test": testkit.WrapTestEnvironment(dealStressTest), "drand-halting": testkit.WrapTestEnvironment(dealsE2E), + "paych-stress": testkit.WrapTestEnvironment(paych.Stress), } func init() { build.BlockDelaySecs = 2 build.PropagationDelaySecs = 4 + + _ = logging.SetLogLevel("*", "WARN") + _ = logging.SetLogLevel("dht/RtRefreshManager", "ERROR") // noisy + _ = logging.SetLogLevel("bitswap", "ERROR") // noisy + + _ = os.Setenv("BELLMAN_NO_GPU", "1") + + build.InsecurePoStValidation = true + build.DisableBuiltinAssets = true + + // MessageConfidence is the amount of tipsets we wait after a message is + // mined, e.g. payment channel creation, to be considered committed. + build.MessageConfidence = 1 + + power.ConsensusMinerMinPower = big.NewInt(2048) + miner.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{ + abi.RegisteredSealProof_StackedDrg2KiBV1: {}, + } + verifreg.MinVerifiedDealSize = big.NewInt(256) + } func main() { diff --git a/lotus-soup/manifest.toml b/lotus-soup/manifest.toml index bcdba7de2..e76b1d396 100644 --- a/lotus-soup/manifest.toml +++ b/lotus-soup/manifest.toml @@ -22,6 +22,12 @@ enabled = true [runners."cluster:k8s"] enabled = true +###################### +## +## Testcases +## +###################### + [[testcases]] name = "deals-e2e" instances = { min = 1, max = 100, default = 5 } @@ -50,6 +56,7 @@ instances = { min = 1, max = 100, default = 5 } enable_pubsub_tracer = { type = "bool", default = false } mining_mode = { type = "enum", default = "synchronized", options = ["synchronized", "natural"] } + [[testcases]] name = "drand-halting" instances = { min = 1, max = 100, default = 5 } @@ -79,6 +86,7 @@ instances = { min = 1, max = 100, default = 5 } enable_pubsub_tracer = { type = "bool", default = false } # Mining Mode: synchronized -vs- natural time mining_mode = { type = "enum", default = "synchronized", options = ["synchronized", "natural"] } + [[testcases]] name = "deals-stress-test" instances = { min = 1, max = 100, default = 5 } @@ -110,3 +118,36 @@ instances = { min = 1, max = 100, default = 5 } deals = { type = "int", default = 1 } deal_mode = { type = "enum", default = "serial", options = ["serial", "concurrent"] } + + +[[testcases]] +name = "paych-stress" +instances = { min = 1, max = 100, default = 5 } + + [testcases.params] + clients = { type = "int", default = 1 } + miners = { type = "int", default = 1 } + balance = { type = "int", default = 1 } + sectors = { type = "int", default = 1 } + role = { type = "string" } + genesis_timestamp_offset = { type = "int", default = 0 } + + random_beacon_type = { type = "enum", default = "local-drand", options = ["mock", "local-drand", "external-drand"] } + + # Params relevant to drand nodes. drand nodes should have role="drand", and must all be + # in the same composition group. There must be at least threshold drand nodes. + # To get lotus nodes to actually use the drand nodes, you must set random_beacon_type="local-drand" + # for the lotus node groups. + drand_period = { type = "duration", default="10s" } + drand_threshold = { type = "int", default = 2 } + drand_gossip_relay = { type = "bool", default = true } + drand_log_level = { type = "string", default="info" } + suspend_events = { type = "string", default="", desc = "a sequence of halt/resume/wait events separated by '->'" } + + # Params relevant to pubsub tracing + enable_pubsub_tracer = { type = "bool", default = false } # Mining Mode: synchronized -vs- natural time + mining_mode = { type = "enum", default = "synchronized", options = ["synchronized", "natural"] } + + # ********** Test-case specific ********** + increments = { type = "int", default = "100", desc = "increments in which to send payment vouchers" } + lane_count = { type = "int", default = "256", desc = "lanes to open; vouchers will be distributed across these lanes in round-robin fashion" } \ No newline at end of file diff --git a/lotus-soup/paych/README.md b/lotus-soup/paych/README.md new file mode 100644 index 000000000..dbd5879ed --- /dev/null +++ b/lotus-soup/paych/README.md @@ -0,0 +1,32 @@ +# Payment channels end-to-end tests + +This package contains the following test cases, each of which is described +further below. + +- Payment channels stress test case (`stress.go`). + +## Payment channels stress test case (`stress.go`) + +***WIP | blocked due to https://github.com/filecoin-project/lotus/issues/2297*** + +This test case turns all clients into payment receivers and senders. +The first member to start in the group becomes the _receiver_. +All other members become _senders_. + +The _senders_ will open a single payment channel to the _receiver_, and will +wait for the message to be posted on-chain. We are setting +`build.MessageConfidence=1`, in order to accelerate the test. So we'll only wait +for a single tipset confirmation once we witness the message. + +Once the message is posted, we load the payment channel actor address and create +as many lanes as the `lane_count` test parameter dictates. + +When then fetch our total balance, and start sending it on the payment channel, +round-robinning across all lanes, until our balance is extinguished. + +**TODO:** + +- [ ] Assertions, metrics, etc. Actually gather statistics. Right now this is + just a smoke test, and it fails. +- [ ] Implement the _receiver_ logic. +- [ ] Model test lifetime by signalling end. \ No newline at end of file diff --git a/lotus-soup/paych/stress.go b/lotus-soup/paych/stress.go new file mode 100644 index 000000000..3641530e7 --- /dev/null +++ b/lotus-soup/paych/stress.go @@ -0,0 +1,160 @@ +package paych + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/testground/sdk-go/sync" + + "github.com/filecoin-project/oni/lotus-soup/testkit" +) + +var SendersDoneState = sync.State("senders-done") + +// TODO Stress is currently WIP. We found blockers in Lotus that prevent us from +// making progress. See https://github.com/filecoin-project/lotus/issues/2297. +func Stress(t *testkit.TestEnvironment) error { + // Dispatch/forward non-client roles to defaults. + if t.Role != "client" { + return testkit.HandleDefaultRole(t) + } + + // This is a client role. + t.RecordMessage("running payments client") + + var ( + // lanes to open; vouchers will be distributed across these lanes in round-robin fashion + laneCount = t.IntParam("lane_count") + // increments in which to send payment vouchers + increments = big.Mul(big.NewInt(int64(t.IntParam("increments"))), abi.TokenPrecision) + ) + + ctx := context.Background() + cl, err := testkit.PrepareClient(t) + if err != nil { + return err + } + + // are we the receiver or a sender? + mode := "sender" + if t.GroupSeq == 1 { + mode = "receiver" + } + + t.RecordMessage("acting as %s", mode) + + var clients []*testkit.ClientAddressesMsg + sctx, cancel := context.WithCancel(ctx) + clientsCh := make(chan *testkit.ClientAddressesMsg) + t.SyncClient.MustSubscribe(sctx, testkit.ClientsAddrsTopic, clientsCh) + for i := 0; i < t.TestGroupInstanceCount; i++ { + clients = append(clients, <-clientsCh) + } + cancel() + + switch mode { + case "receiver": + // one receiver, everyone else is a sender. + <-t.SyncClient.MustBarrier(ctx, SendersDoneState, t.TestGroupInstanceCount-1).C + + case "sender": + // we're going to lock up all our funds into this one payment channel. + recv := clients[0] + balance, err := cl.FullApi.WalletBalance(ctx, cl.Wallet.Address) + if err != nil { + return fmt.Errorf("failed to acquire wallet balance: %w", err) + } + + t.RecordMessage("my balance: %d", balance) + t.RecordMessage("creating payment channel; from=%s, to=%s, funds=%d", cl.Wallet.Address, recv.WalletAddr, balance) + + pid := os.Getpid() + t.RecordMessage("sender pid: %d", pid) + + time.Sleep(20 * time.Second) + + channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, balance) + if err != nil { + return fmt.Errorf("failed to create payment channel: %w", err) + } + + if addr := channel.Channel; addr != address.Undef { + return fmt.Errorf("expected an Undef channel address, got: %s", addr) + } + + t.RecordMessage("payment channel created; msg_cid=%s", channel.ChannelMessage) + t.RecordMessage("waiting for payment channel message to appear on chain") + + // wait for the channel creation message to appear on chain. + _, err = cl.FullApi.StateWaitMsg(ctx, channel.ChannelMessage, 2) + if err != nil { + return fmt.Errorf("failed while waiting for payment channel creation msg to appear on chain: %w", err) + } + + // need to wait so that the channel is tracked. + // the full API waits for build.MessageConfidence (=1 in tests) before tracking the channel. + // we wait for 2 confirmations, so we have the assurance the channel is tracked. + + t.RecordMessage("reloading paych; now it should have an address") + channel, err = cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, big.Zero()) + if err != nil { + return fmt.Errorf("failed to reload payment channel: %w", err) + } + + t.RecordMessage("channel address: %s", channel.Channel) + t.RecordMessage("allocating lanes; count=%d", laneCount) + + // allocate as many lanes as required + var lanes []uint64 + for i := 0; i < laneCount; i++ { + lane, err := cl.FullApi.PaychAllocateLane(ctx, channel.Channel) + if err != nil { + return fmt.Errorf("failed to allocate lane: %w", err) + } + lanes = append(lanes, lane) + } + + t.RecordMessage("lanes allocated; count=%d", laneCount) + t.RecordMessage("sending payments in round-robin fashion across lanes; increments=%d", increments) + + // start sending payments + zero := big.Zero() + + Outer: + for remaining := balance; remaining.GreaterThan(zero); { + for _, lane := range lanes { + voucher, err := cl.FullApi.PaychVoucherCreate(ctx, channel.Channel, increments, lane) + if err != nil { + return fmt.Errorf("failed to create voucher: %w", err) + } + t.RecordMessage("payment voucher created; lane=%d, nonce=%d, amount=%d", voucher.Lane, voucher.Nonce, voucher.Amount) + + cid, err := cl.FullApi.PaychVoucherSubmit(ctx, channel.Channel, voucher) + if err != nil { + return fmt.Errorf("failed to submit voucher: %w", err) + } + t.RecordMessage("payment voucher submitted; msg_cid=%s, lane=%d, nonce=%d, amount=%d", cid, voucher.Lane, voucher.Nonce, voucher.Amount) + + remaining = types.BigSub(remaining, increments) + t.RecordMessage("remaining balance: %d", remaining) + if remaining.LessThanEqual(zero) { + // we have no more funds remaining. + break Outer + } + } + } + + t.RecordMessage("finished sending all payment vouchers") + + t.SyncClient.MustSignalEntry(ctx, SendersDoneState) + } + + return nil + +} diff --git a/lotus-soup/testkit/node.go b/lotus-soup/testkit/node.go index 75f14b9d6..4ce4d45a7 100644 --- a/lotus-soup/testkit/node.go +++ b/lotus-soup/testkit/node.go @@ -9,51 +9,28 @@ import ( "time" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules/dtypes" modtest "github.com/filecoin-project/lotus/node/modules/testing" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/abi/big" - saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner" - "github.com/filecoin-project/specs-actors/actors/builtin/power" - "github.com/filecoin-project/specs-actors/actors/builtin/verifreg" - logging "github.com/ipfs/go-log/v2" - influxdb "github.com/kpacha/opencensus-influxdb" - ma "github.com/multiformats/go-multiaddr" - tstats "github.com/filecoin-project/lotus/tools/stats" - "github.com/libp2p/go-libp2p-core/peer" - manet "github.com/multiformats/go-multiaddr-net" + "github.com/kpacha/opencensus-influxdb" + ma "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multiaddr-net" "go.opencensus.io/stats" "go.opencensus.io/stats/view" ) -func init() { - _ = logging.SetLogLevel("*", "WARN") - - _ = os.Setenv("BELLMAN_NO_GPU", "1") - - build.InsecurePoStValidation = true - build.DisableBuiltinAssets = true - - power.ConsensusMinerMinPower = big.NewInt(2048) - saminer.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{ - abi.RegisteredSealProof_StackedDrg2KiBV1: {}, - } - verifreg.MinVerifiedDealSize = big.NewInt(256) -} - var PrepareNodeTimeout = time.Minute type LotusNode struct { FullApi api.FullNode MinerApi api.StorageMiner StopFn node.StopFunc + Wallet *wallet.Key MineOne func(context.Context, func(bool, error)) error } @@ -68,6 +45,8 @@ func (n *LotusNode) setWallet(ctx context.Context, walletKey *wallet.Key) error return err } + n.Wallet = walletKey + return nil } @@ -138,11 +117,11 @@ func CollectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]M return addrs, nil } -func CollectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]peer.AddrInfo, error) { - ch := make(chan peer.AddrInfo) +func CollectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]*ClientAddressesMsg, error) { + ch := make(chan *ClientAddressesMsg) sub := t.SyncClient.MustSubscribe(ctx, ClientsAddrsTopic, ch) - addrs := make([]peer.AddrInfo, 0, clients) + addrs := make([]*ClientAddressesMsg, 0, clients) for i := 0; i < clients; i++ { select { case a := <-ch: diff --git a/lotus-soup/testkit/role_bootstrapper.go b/lotus-soup/testkit/role_bootstrapper.go index 4fae86236..3049a48ca 100644 --- a/lotus-soup/testkit/role_bootstrapper.go +++ b/lotus-soup/testkit/role_bootstrapper.go @@ -55,6 +55,16 @@ func PrepareBootstrapper(t *TestEnvironment) (*Bootstrapper, error) { return nil, err } + totalBalance := big.Zero() + for _, b := range balances { + totalBalance = big.Add(big.NewInt(int64(b.Balance)), totalBalance) + } + + t.RecordMessage("TOTAL BALANCE: %s", totalBalance) + if max := types.TotalFilecoinInt; totalBalance.GreaterThanEqual(max) { + panic(fmt.Sprintf("total sum of balances is greater than max Filecoin ever; sum=%s, max=%s", totalBalance, max)) + } + // then collect all preseals from miners preseals, err := CollectPreseals(t, ctx, miners) if err != nil { @@ -66,10 +76,12 @@ func PrepareBootstrapper(t *TestEnvironment) (*Bootstrapper, error) { var genesisMiners []genesis.Miner for _, bm := range balances { + balance := big.Mul(big.NewInt(int64(bm.Balance)), types.NewInt(build.FilecoinPrecision)) + t.RecordMessage("balance assigned to actor %s: %s AttoFIL", bm.Addr, balance) genesisActors = append(genesisActors, genesis.Actor{ Type: genesis.TAccount, - Balance: big.Mul(big.NewInt(int64(bm.Balance)), types.NewInt(build.FilecoinPrecision)), + Balance: balance, Meta: (&genesis.AccountMeta{Owner: bm.Addr}).ActorMeta(), }) } diff --git a/lotus-soup/testkit/role_client.go b/lotus-soup/testkit/role_client.go index f98d07f65..98967f43c 100644 --- a/lotus-soup/testkit/role_client.go +++ b/lotus-soup/testkit/role_client.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "net/http" + "time" + "contrib.go.opencensus.io/exporter/prometheus" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/lotus/api" @@ -94,7 +96,10 @@ func PrepareClient(t *TestEnvironment) (*LotusClient, error) { if err != nil { return nil, err } - t.SyncClient.MustPublish(ctx, ClientsAddrsTopic, addrinfo) + t.SyncClient.MustPublish(ctx, ClientsAddrsTopic, &ClientAddressesMsg{ + PeerAddr: addrinfo, + WalletAddr: walletKey.Address, + }) t.RecordMessage("waiting for all nodes to be ready") t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount) @@ -106,6 +111,26 @@ func PrepareClient(t *TestEnvironment) (*LotusClient, error) { } t.RecordMessage("got %v miner addrs", len(addrs)) + // densely connect the client to the full node and the miners themselves. + for _, miner := range addrs { + if err := n.FullApi.NetConnect(ctx, miner.FullNetAddrs); err != nil { + return nil, fmt.Errorf("client failed to connect to full node of miner: %w", err) + } + if err := n.FullApi.NetConnect(ctx, miner.MinerNetAddrs); err != nil { + return nil, fmt.Errorf("client failed to connect to storage miner node node of miner: %w", err) + } + } + + // wait for all clients to have completed identify, pubsub negotiation with miners. + time.Sleep(1 * time.Second) + + peers, err := n.FullApi.NetPeers(ctx) + if err != nil { + return nil, fmt.Errorf("failed to query connected peers: %w", err) + } + + t.RecordMessage("connected peers: %d", len(peers)) + cl := &LotusClient{ t: t, LotusNode: n, @@ -132,6 +157,15 @@ func startFullNodeAPIServer(t *TestEnvironment, repo *repo.MemRepo, api api.Full http.Handle("/rpc/v0", ah) + exporter, err := prometheus.NewExporter(prometheus.Options{ + Namespace: "lotus", + }) + if err != nil { + return err + } + + http.Handle("/debug/metrics", exporter) + srv := &http.Server{Handler: http.DefaultServeMux} endpoint, err := repo.APIEndpoint() diff --git a/lotus-soup/testkit/role_miner.go b/lotus-soup/testkit/role_miner.go index 6d9b2cc89..86047dece 100644 --- a/lotus-soup/testkit/role_miner.go +++ b/lotus-soup/testkit/role_miner.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net/http" + "contrib.go.opencensus.io/exporter/prometheus" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc/auth" @@ -200,6 +201,7 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) { mineBlock := make(chan func(bool, error)) minerOpts = append(minerOpts, node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock, minerAddr))) + n.MineOne = func(ctx context.Context, cb func(bool, error)) error { select { case mineBlock <- cb: @@ -232,16 +234,16 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) { go collectStats(t, ctx, n.FullApi) } - // Bootstrap with full node - remoteAddrs, err := n.FullApi.NetAddrsListen(ctx) + // Start listening on the full node. + fullNodeNetAddrs, err := n.FullApi.NetAddrsListen(ctx) if err != nil { panic(err) } - err = n.MinerApi.NetConnect(ctx, remoteAddrs) - if err != nil { - panic(err) - } + // err = n.MinerApi.NetConnect(ctx, fullNodeNetAddrs) + // if err != nil { + // panic(err) + // } // add local storage for presealed sectors err = n.MinerApi.StorageAddLocal(ctx, presealDir) @@ -273,31 +275,41 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) { } t.RecordMessage("publish our address to the miners addr topic") - actoraddress, err := n.MinerApi.ActorAddress(ctx) + minerActor, err := n.MinerApi.ActorAddress(ctx) if err != nil { return nil, err } - addrinfo, err := n.MinerApi.NetAddrsListen(ctx) + + minerNetAddrs, err := n.MinerApi.NetAddrsListen(ctx) if err != nil { return nil, err } - t.SyncClient.MustPublish(ctx, MinersAddrsTopic, MinerAddressesMsg{addrinfo, actoraddress}) + + t.SyncClient.MustPublish(ctx, MinersAddrsTopic, MinerAddressesMsg{ + FullNetAddrs: fullNodeNetAddrs, + MinerNetAddrs: minerNetAddrs, + MinerActorAddr: minerActor, + }) t.RecordMessage("connecting to all other miners") - // connect to all other miners. + // densely connect the miner's full nodes. minerCh := make(chan *MinerAddressesMsg, 16) sctx, cancel := context.WithCancel(ctx) defer cancel() t.SyncClient.MustSubscribe(sctx, MinersAddrsTopic, minerCh) for i := 0; i < t.IntParam("miners"); i++ { - if miner := <-minerCh; miner.ActorAddr != actoraddress { - err := n.FullApi.NetConnect(ctx, miner.PeerAddr) - if err != nil { - return nil, fmt.Errorf("failed to connect to miner %s on: %v", miner.ActorAddr, miner.PeerAddr) - } - t.RecordMessage("connected to miner %s on %v", miner.ActorAddr, miner.PeerAddr) + m := <-minerCh + if m.MinerActorAddr == minerActor { + // once I find myself, I stop connecting to others, to avoid a simopen problem. + break } + err := n.FullApi.NetConnect(ctx, m.FullNetAddrs) + if err != nil { + return nil, fmt.Errorf("failed to connect to miner %s on: %v", m.MinerActorAddr, m.FullNetAddrs) + } + t.RecordMessage("connected to full node of miner %s on %v", m.MinerActorAddr, m.FullNetAddrs) + } t.RecordMessage("waiting for all nodes to be ready") @@ -411,6 +423,15 @@ func startStorageMinerAPIServer(t *TestEnvironment, repo *repo.MemRepo, minerApi mux.PathPrefix("/remote").HandlerFunc(minerApi.(*impl.StorageMinerAPI).ServeRemote) mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof + exporter, err := prometheus.NewExporter(prometheus.Options{ + Namespace: "lotus", + }) + if err != nil { + return err + } + + mux.Handle("/debug/metrics", exporter) + ah := &auth.Handler{ Verify: minerApi.AuthVerify, Next: mux.ServeHTTP, diff --git a/lotus-soup/testkit/sync.go b/lotus-soup/testkit/sync.go index 0ca4327bc..27a4bc50e 100644 --- a/lotus-soup/testkit/sync.go +++ b/lotus-soup/testkit/sync.go @@ -12,7 +12,7 @@ var ( GenesisTopic = sync.NewTopic("genesis", &GenesisMsg{}) BalanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{}) PresealTopic = sync.NewTopic("preseal", &PresealMsg{}) - ClientsAddrsTopic = sync.NewTopic("clients_addrs", &peer.AddrInfo{}) + ClientsAddrsTopic = sync.NewTopic("clients_addrs", &ClientAddressesMsg{}) MinersAddrsTopic = sync.NewTopic("miners_addrs", &MinerAddressesMsg{}) PubsubTracerTopic = sync.NewTopic("pubsub_tracer", &PubsubTracerMsg{}) DrandConfigTopic = sync.NewTopic("drand_config", &DrandRuntimeInfo{}) @@ -40,9 +40,15 @@ type GenesisMsg struct { Bootstrapper []byte } +type ClientAddressesMsg struct { + PeerAddr peer.AddrInfo + WalletAddr address.Address +} + type MinerAddressesMsg struct { - PeerAddr peer.AddrInfo - ActorAddr address.Address + FullNetAddrs peer.AddrInfo + MinerNetAddrs peer.AddrInfo + MinerActorAddr address.Address } type PubsubTracerMsg struct {