WIP payments channels stress test (blocked).

This commit is contained in:
Raúl Kripalani 2020-07-07 13:58:09 +01:00
parent e899da0063
commit 5d1d7335b0
15 changed files with 430 additions and 66 deletions

View File

@ -0,0 +1,49 @@
[metadata]
name = "lotus-soup"
author = "raulk"
[global]
plan = "lotus-soup"
case = "paych-stress"
total_instances = 5 # 2 clients + 2 miners + 1 bootstrapper
builder = "exec:go"
runner = "local:exec"
[global.build_config]
enable_go_build_cache = true
[global.run_config]
exposed_ports = ["6060", "1234", "2345"]
[global.build]
selectors = ["testground"]
[global.run.test_params]
clients = "2"
miners = "2"
genesis_timestamp_offset = "0"
balance = "30" ## be careful, this is in FIL.
sectors = "10"
random_beacon_type = "mock"
mining_mode = "natural"
lane_count = "8"
increments = "30" ## in FIL
[[groups]]
id = "bootstrapper"
instances = { count = 1 }
[groups.run.test_params]
role = "bootstrapper"
[[groups]]
id = "miners"
instances = { count = 2 }
[groups.run.test_params]
role = "miner"
[[groups]]
id = "clients"
# the first client will be on the receiving end; all others will be on the sending end.
instances = { count = 2 }
[groups.run.test_params]
role = "client"

View File

@ -22,7 +22,7 @@
clients = "3"
miners = "2"
genesis_timestamp_offset = "100000"
balance = "2000000000"
balance = "2000000000" ## be careful, this is in FIL.
sectors = "10"
random_beacon_type = "mock"

View File

@ -50,12 +50,12 @@ func dealsE2E(t *testkit.TestEnvironment) error {
// select a random miner
minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))]
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
if err := client.NetConnect(ctx, minerAddr.MinerNetAddrs); err != nil {
return err
}
t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1)
t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.MinerActorAddr)).Inc(1)
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
t.RecordMessage("selected %s as the miner", minerAddr.MinerActorAddr)
time.Sleep(2 * time.Second)
@ -82,7 +82,7 @@ func dealsE2E(t *testkit.TestEnvironment) error {
// start deal
t1 := time.Now()
deal := testkit.StartDeal(ctx, minerAddr.ActorAddr, client, fcid)
deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, fcid)
t.RecordMessage("started deal: %s", deal)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this

View File

@ -33,11 +33,11 @@ func dealStressTest(t *testkit.TestEnvironment) error {
// select a random miner
minerAddr := cl.MinerAddrs[rand.Intn(len(cl.MinerAddrs))]
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
if err := client.NetConnect(ctx, minerAddr.MinerNetAddrs); err != nil {
return err
}
t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
t.RecordMessage("selected %s as the miner", minerAddr.MinerActorAddr)
time.Sleep(2 * time.Second)
@ -92,12 +92,12 @@ func dealStressTest(t *testkit.TestEnvironment) error {
go func(i int) {
defer wg1.Done()
t1 := time.Now()
deal := testkit.StartDeal(ctx, minerAddr.ActorAddr, client, cids[i])
deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i])
t.RecordMessage("started storage deal %d -> %s", i, deal)
time.Sleep(2 * time.Second)
t.RecordMessage("waiting for deal %d to be sealed", i)
testkit.WaitDealSealed(t, ctx, client, deal)
t.D().ResettingHistogram(fmt.Sprintf("deal.sealed,miner=%s", minerAddr.ActorAddr)).Update(int64(time.Since(t1)))
t.D().ResettingHistogram(fmt.Sprintf("deal.sealed,miner=%s", minerAddr.MinerActorAddr)).Update(int64(time.Since(t1)))
}(i)
}
t.RecordMessage("waiting for all deals to be sealed")
@ -123,7 +123,7 @@ func dealStressTest(t *testkit.TestEnvironment) error {
} else {
for i := 0; i < deals; i++ {
deal := testkit.StartDeal(ctx, minerAddr.ActorAddr, client, cids[i])
deal := testkit.StartDeal(ctx, minerAddr.MinerActorAddr, client, cids[i])
t.RecordMessage("started storage deal %d -> %s", i, deal)
time.Sleep(2 * time.Second)
t.RecordMessage("waiting for deal %d to be sealed", i)

View File

@ -3,6 +3,7 @@ module github.com/filecoin-project/oni/lotus-soup
go 1.14
require (
contrib.go.opencensus.io/exporter/prometheus v0.1.0
github.com/davecgh/go-spew v1.1.1
github.com/drand/drand v0.9.2-0.20200616080806-a94e9c1636a4
github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef

View File

@ -27,6 +27,7 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE=
contrib.go.opencensus.io/exporter/jaeger v0.1.0/go.mod h1:VYianECmuFPwU37O699Vc1GOcy+y8kOsfaxHRImmjbA=
contrib.go.opencensus.io/exporter/prometheus v0.1.0 h1:SByaIoWwNgMdPSgl5sMqM2KDE5H/ukPWBRo314xiDvg=
contrib.go.opencensus.io/exporter/prometheus v0.1.0/go.mod h1:cGFniUXGZlKRjzOyuZJ6mgB+PgBcCIa79kEKR8YCW+A=
dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
@ -245,10 +246,6 @@ github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIi
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
github.com/filecoin-project/lotus v0.4.2-0.20200706092412-516e31d37cd7 h1:eE3a712/0rcnml1lqwtGHYz9JrX4vLqBk9yBdYtH9Jo=
github.com/filecoin-project/lotus v0.4.2-0.20200706092412-516e31d37cd7/go.mod h1:uo3yDPhPlpHwdCKr0k41/a205WwlSclQamx+sQDKRMI=
github.com/filecoin-project/lotus v0.4.2-0.20200706153752-f8b65d391143 h1:03PUHBPtjNmxdPbOL258pXLVE4Dz3xjsY86THGV1UQQ=
github.com/filecoin-project/lotus v0.4.2-0.20200706153752-f8b65d391143/go.mod h1:uo3yDPhPlpHwdCKr0k41/a205WwlSclQamx+sQDKRMI=
github.com/filecoin-project/lotus v0.4.2-0.20200706172415-cf6ac44b6ec5 h1:wsEkRwhcWvaZowowC2Kj9ueJ2vIRDqOxOcFvqqgHdxE=
github.com/filecoin-project/lotus v0.4.2-0.20200706172415-cf6ac44b6ec5/go.mod h1:uo3yDPhPlpHwdCKr0k41/a205WwlSclQamx+sQDKRMI=
github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM=

View File

@ -1,22 +1,54 @@
package main
import (
"github.com/testground/sdk-go/run"
"os"
"github.com/filecoin-project/oni/lotus-soup/paych"
"github.com/filecoin-project/oni/lotus-soup/testkit"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
"github.com/testground/sdk-go/run"
logging "github.com/ipfs/go-log/v2"
)
var cases = map[string]interface{}{
"deals-e2e": testkit.WrapTestEnvironment(dealsE2E),
"deals-stress-test": testkit.WrapTestEnvironment(dealStressTest),
"drand-halting": testkit.WrapTestEnvironment(dealsE2E),
"paych-stress": testkit.WrapTestEnvironment(paych.Stress),
}
func init() {
build.BlockDelaySecs = 2
build.PropagationDelaySecs = 4
_ = logging.SetLogLevel("*", "WARN")
_ = logging.SetLogLevel("dht/RtRefreshManager", "ERROR") // noisy
_ = logging.SetLogLevel("bitswap", "ERROR") // noisy
_ = os.Setenv("BELLMAN_NO_GPU", "1")
build.InsecurePoStValidation = true
build.DisableBuiltinAssets = true
// MessageConfidence is the amount of tipsets we wait after a message is
// mined, e.g. payment channel creation, to be considered committed.
build.MessageConfidence = 1
power.ConsensusMinerMinPower = big.NewInt(2048)
miner.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{
abi.RegisteredSealProof_StackedDrg2KiBV1: {},
}
verifreg.MinVerifiedDealSize = big.NewInt(256)
}
func main() {

View File

@ -22,6 +22,12 @@ enabled = true
[runners."cluster:k8s"]
enabled = true
######################
##
## Testcases
##
######################
[[testcases]]
name = "deals-e2e"
instances = { min = 1, max = 100, default = 5 }
@ -50,6 +56,7 @@ instances = { min = 1, max = 100, default = 5 }
enable_pubsub_tracer = { type = "bool", default = false }
mining_mode = { type = "enum", default = "synchronized", options = ["synchronized", "natural"] }
[[testcases]]
name = "drand-halting"
instances = { min = 1, max = 100, default = 5 }
@ -79,6 +86,7 @@ instances = { min = 1, max = 100, default = 5 }
enable_pubsub_tracer = { type = "bool", default = false } # Mining Mode: synchronized -vs- natural time
mining_mode = { type = "enum", default = "synchronized", options = ["synchronized", "natural"] }
[[testcases]]
name = "deals-stress-test"
instances = { min = 1, max = 100, default = 5 }
@ -110,3 +118,36 @@ instances = { min = 1, max = 100, default = 5 }
deals = { type = "int", default = 1 }
deal_mode = { type = "enum", default = "serial", options = ["serial", "concurrent"] }
[[testcases]]
name = "paych-stress"
instances = { min = 1, max = 100, default = 5 }
[testcases.params]
clients = { type = "int", default = 1 }
miners = { type = "int", default = 1 }
balance = { type = "int", default = 1 }
sectors = { type = "int", default = 1 }
role = { type = "string" }
genesis_timestamp_offset = { type = "int", default = 0 }
random_beacon_type = { type = "enum", default = "local-drand", options = ["mock", "local-drand", "external-drand"] }
# Params relevant to drand nodes. drand nodes should have role="drand", and must all be
# in the same composition group. There must be at least threshold drand nodes.
# To get lotus nodes to actually use the drand nodes, you must set random_beacon_type="local-drand"
# for the lotus node groups.
drand_period = { type = "duration", default="10s" }
drand_threshold = { type = "int", default = 2 }
drand_gossip_relay = { type = "bool", default = true }
drand_log_level = { type = "string", default="info" }
suspend_events = { type = "string", default="", desc = "a sequence of halt/resume/wait events separated by '->'" }
# Params relevant to pubsub tracing
enable_pubsub_tracer = { type = "bool", default = false } # Mining Mode: synchronized -vs- natural time
mining_mode = { type = "enum", default = "synchronized", options = ["synchronized", "natural"] }
# ********** Test-case specific **********
increments = { type = "int", default = "100", desc = "increments in which to send payment vouchers" }
lane_count = { type = "int", default = "256", desc = "lanes to open; vouchers will be distributed across these lanes in round-robin fashion" }

View File

@ -0,0 +1,32 @@
# Payment channels end-to-end tests
This package contains the following test cases, each of which is described
further below.
- Payment channels stress test case (`stress.go`).
## Payment channels stress test case (`stress.go`)
***WIP | blocked due to https://github.com/filecoin-project/lotus/issues/2297***
This test case turns all clients into payment receivers and senders.
The first member to start in the group becomes the _receiver_.
All other members become _senders_.
The _senders_ will open a single payment channel to the _receiver_, and will
wait for the message to be posted on-chain. We are setting
`build.MessageConfidence=1`, in order to accelerate the test. So we'll only wait
for a single tipset confirmation once we witness the message.
Once the message is posted, we load the payment channel actor address and create
as many lanes as the `lane_count` test parameter dictates.
When then fetch our total balance, and start sending it on the payment channel,
round-robinning across all lanes, until our balance is extinguished.
**TODO:**
- [ ] Assertions, metrics, etc. Actually gather statistics. Right now this is
just a smoke test, and it fails.
- [ ] Implement the _receiver_ logic.
- [ ] Model test lifetime by signalling end.

160
lotus-soup/paych/stress.go Normal file
View File

@ -0,0 +1,160 @@
package paych
import (
"context"
"fmt"
"os"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/testground/sdk-go/sync"
"github.com/filecoin-project/oni/lotus-soup/testkit"
)
var SendersDoneState = sync.State("senders-done")
// TODO Stress is currently WIP. We found blockers in Lotus that prevent us from
// making progress. See https://github.com/filecoin-project/lotus/issues/2297.
func Stress(t *testkit.TestEnvironment) error {
// Dispatch/forward non-client roles to defaults.
if t.Role != "client" {
return testkit.HandleDefaultRole(t)
}
// This is a client role.
t.RecordMessage("running payments client")
var (
// lanes to open; vouchers will be distributed across these lanes in round-robin fashion
laneCount = t.IntParam("lane_count")
// increments in which to send payment vouchers
increments = big.Mul(big.NewInt(int64(t.IntParam("increments"))), abi.TokenPrecision)
)
ctx := context.Background()
cl, err := testkit.PrepareClient(t)
if err != nil {
return err
}
// are we the receiver or a sender?
mode := "sender"
if t.GroupSeq == 1 {
mode = "receiver"
}
t.RecordMessage("acting as %s", mode)
var clients []*testkit.ClientAddressesMsg
sctx, cancel := context.WithCancel(ctx)
clientsCh := make(chan *testkit.ClientAddressesMsg)
t.SyncClient.MustSubscribe(sctx, testkit.ClientsAddrsTopic, clientsCh)
for i := 0; i < t.TestGroupInstanceCount; i++ {
clients = append(clients, <-clientsCh)
}
cancel()
switch mode {
case "receiver":
// one receiver, everyone else is a sender.
<-t.SyncClient.MustBarrier(ctx, SendersDoneState, t.TestGroupInstanceCount-1).C
case "sender":
// we're going to lock up all our funds into this one payment channel.
recv := clients[0]
balance, err := cl.FullApi.WalletBalance(ctx, cl.Wallet.Address)
if err != nil {
return fmt.Errorf("failed to acquire wallet balance: %w", err)
}
t.RecordMessage("my balance: %d", balance)
t.RecordMessage("creating payment channel; from=%s, to=%s, funds=%d", cl.Wallet.Address, recv.WalletAddr, balance)
pid := os.Getpid()
t.RecordMessage("sender pid: %d", pid)
time.Sleep(20 * time.Second)
channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, balance)
if err != nil {
return fmt.Errorf("failed to create payment channel: %w", err)
}
if addr := channel.Channel; addr != address.Undef {
return fmt.Errorf("expected an Undef channel address, got: %s", addr)
}
t.RecordMessage("payment channel created; msg_cid=%s", channel.ChannelMessage)
t.RecordMessage("waiting for payment channel message to appear on chain")
// wait for the channel creation message to appear on chain.
_, err = cl.FullApi.StateWaitMsg(ctx, channel.ChannelMessage, 2)
if err != nil {
return fmt.Errorf("failed while waiting for payment channel creation msg to appear on chain: %w", err)
}
// need to wait so that the channel is tracked.
// the full API waits for build.MessageConfidence (=1 in tests) before tracking the channel.
// we wait for 2 confirmations, so we have the assurance the channel is tracked.
t.RecordMessage("reloading paych; now it should have an address")
channel, err = cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, big.Zero())
if err != nil {
return fmt.Errorf("failed to reload payment channel: %w", err)
}
t.RecordMessage("channel address: %s", channel.Channel)
t.RecordMessage("allocating lanes; count=%d", laneCount)
// allocate as many lanes as required
var lanes []uint64
for i := 0; i < laneCount; i++ {
lane, err := cl.FullApi.PaychAllocateLane(ctx, channel.Channel)
if err != nil {
return fmt.Errorf("failed to allocate lane: %w", err)
}
lanes = append(lanes, lane)
}
t.RecordMessage("lanes allocated; count=%d", laneCount)
t.RecordMessage("sending payments in round-robin fashion across lanes; increments=%d", increments)
// start sending payments
zero := big.Zero()
Outer:
for remaining := balance; remaining.GreaterThan(zero); {
for _, lane := range lanes {
voucher, err := cl.FullApi.PaychVoucherCreate(ctx, channel.Channel, increments, lane)
if err != nil {
return fmt.Errorf("failed to create voucher: %w", err)
}
t.RecordMessage("payment voucher created; lane=%d, nonce=%d, amount=%d", voucher.Lane, voucher.Nonce, voucher.Amount)
cid, err := cl.FullApi.PaychVoucherSubmit(ctx, channel.Channel, voucher)
if err != nil {
return fmt.Errorf("failed to submit voucher: %w", err)
}
t.RecordMessage("payment voucher submitted; msg_cid=%s, lane=%d, nonce=%d, amount=%d", cid, voucher.Lane, voucher.Nonce, voucher.Amount)
remaining = types.BigSub(remaining, increments)
t.RecordMessage("remaining balance: %d", remaining)
if remaining.LessThanEqual(zero) {
// we have no more funds remaining.
break Outer
}
}
}
t.RecordMessage("finished sending all payment vouchers")
t.SyncClient.MustSignalEntry(ctx, SendersDoneState)
}
return nil
}

View File

@ -9,51 +9,28 @@ import (
"time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules/dtypes"
modtest "github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
logging "github.com/ipfs/go-log/v2"
influxdb "github.com/kpacha/opencensus-influxdb"
ma "github.com/multiformats/go-multiaddr"
tstats "github.com/filecoin-project/lotus/tools/stats"
"github.com/libp2p/go-libp2p-core/peer"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/kpacha/opencensus-influxdb"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multiaddr-net"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)
func init() {
_ = logging.SetLogLevel("*", "WARN")
_ = os.Setenv("BELLMAN_NO_GPU", "1")
build.InsecurePoStValidation = true
build.DisableBuiltinAssets = true
power.ConsensusMinerMinPower = big.NewInt(2048)
saminer.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{
abi.RegisteredSealProof_StackedDrg2KiBV1: {},
}
verifreg.MinVerifiedDealSize = big.NewInt(256)
}
var PrepareNodeTimeout = time.Minute
type LotusNode struct {
FullApi api.FullNode
MinerApi api.StorageMiner
StopFn node.StopFunc
Wallet *wallet.Key
MineOne func(context.Context, func(bool, error)) error
}
@ -68,6 +45,8 @@ func (n *LotusNode) setWallet(ctx context.Context, walletKey *wallet.Key) error
return err
}
n.Wallet = walletKey
return nil
}
@ -138,11 +117,11 @@ func CollectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]M
return addrs, nil
}
func CollectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]peer.AddrInfo, error) {
ch := make(chan peer.AddrInfo)
func CollectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]*ClientAddressesMsg, error) {
ch := make(chan *ClientAddressesMsg)
sub := t.SyncClient.MustSubscribe(ctx, ClientsAddrsTopic, ch)
addrs := make([]peer.AddrInfo, 0, clients)
addrs := make([]*ClientAddressesMsg, 0, clients)
for i := 0; i < clients; i++ {
select {
case a := <-ch:

View File

@ -55,6 +55,16 @@ func PrepareBootstrapper(t *TestEnvironment) (*Bootstrapper, error) {
return nil, err
}
totalBalance := big.Zero()
for _, b := range balances {
totalBalance = big.Add(big.NewInt(int64(b.Balance)), totalBalance)
}
t.RecordMessage("TOTAL BALANCE: %s", totalBalance)
if max := types.TotalFilecoinInt; totalBalance.GreaterThanEqual(max) {
panic(fmt.Sprintf("total sum of balances is greater than max Filecoin ever; sum=%s, max=%s", totalBalance, max))
}
// then collect all preseals from miners
preseals, err := CollectPreseals(t, ctx, miners)
if err != nil {
@ -66,10 +76,12 @@ func PrepareBootstrapper(t *TestEnvironment) (*Bootstrapper, error) {
var genesisMiners []genesis.Miner
for _, bm := range balances {
balance := big.Mul(big.NewInt(int64(bm.Balance)), types.NewInt(build.FilecoinPrecision))
t.RecordMessage("balance assigned to actor %s: %s AttoFIL", bm.Addr, balance)
genesisActors = append(genesisActors,
genesis.Actor{
Type: genesis.TAccount,
Balance: big.Mul(big.NewInt(int64(bm.Balance)), types.NewInt(build.FilecoinPrecision)),
Balance: balance,
Meta: (&genesis.AccountMeta{Owner: bm.Addr}).ActorMeta(),
})
}

View File

@ -4,7 +4,9 @@ import (
"context"
"fmt"
"net/http"
"time"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/api"
@ -94,7 +96,10 @@ func PrepareClient(t *TestEnvironment) (*LotusClient, error) {
if err != nil {
return nil, err
}
t.SyncClient.MustPublish(ctx, ClientsAddrsTopic, addrinfo)
t.SyncClient.MustPublish(ctx, ClientsAddrsTopic, &ClientAddressesMsg{
PeerAddr: addrinfo,
WalletAddr: walletKey.Address,
})
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
@ -106,6 +111,26 @@ func PrepareClient(t *TestEnvironment) (*LotusClient, error) {
}
t.RecordMessage("got %v miner addrs", len(addrs))
// densely connect the client to the full node and the miners themselves.
for _, miner := range addrs {
if err := n.FullApi.NetConnect(ctx, miner.FullNetAddrs); err != nil {
return nil, fmt.Errorf("client failed to connect to full node of miner: %w", err)
}
if err := n.FullApi.NetConnect(ctx, miner.MinerNetAddrs); err != nil {
return nil, fmt.Errorf("client failed to connect to storage miner node node of miner: %w", err)
}
}
// wait for all clients to have completed identify, pubsub negotiation with miners.
time.Sleep(1 * time.Second)
peers, err := n.FullApi.NetPeers(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query connected peers: %w", err)
}
t.RecordMessage("connected peers: %d", len(peers))
cl := &LotusClient{
t: t,
LotusNode: n,
@ -132,6 +157,15 @@ func startFullNodeAPIServer(t *TestEnvironment, repo *repo.MemRepo, api api.Full
http.Handle("/rpc/v0", ah)
exporter, err := prometheus.NewExporter(prometheus.Options{
Namespace: "lotus",
})
if err != nil {
return err
}
http.Handle("/debug/metrics", exporter)
srv := &http.Server{Handler: http.DefaultServeMux}
endpoint, err := repo.APIEndpoint()

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
@ -200,6 +201,7 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
mineBlock := make(chan func(bool, error))
minerOpts = append(minerOpts,
node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock, minerAddr)))
n.MineOne = func(ctx context.Context, cb func(bool, error)) error {
select {
case mineBlock <- cb:
@ -232,16 +234,16 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
go collectStats(t, ctx, n.FullApi)
}
// Bootstrap with full node
remoteAddrs, err := n.FullApi.NetAddrsListen(ctx)
// Start listening on the full node.
fullNodeNetAddrs, err := n.FullApi.NetAddrsListen(ctx)
if err != nil {
panic(err)
}
err = n.MinerApi.NetConnect(ctx, remoteAddrs)
if err != nil {
panic(err)
}
// err = n.MinerApi.NetConnect(ctx, fullNodeNetAddrs)
// if err != nil {
// panic(err)
// }
// add local storage for presealed sectors
err = n.MinerApi.StorageAddLocal(ctx, presealDir)
@ -273,31 +275,41 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
}
t.RecordMessage("publish our address to the miners addr topic")
actoraddress, err := n.MinerApi.ActorAddress(ctx)
minerActor, err := n.MinerApi.ActorAddress(ctx)
if err != nil {
return nil, err
}
addrinfo, err := n.MinerApi.NetAddrsListen(ctx)
minerNetAddrs, err := n.MinerApi.NetAddrsListen(ctx)
if err != nil {
return nil, err
}
t.SyncClient.MustPublish(ctx, MinersAddrsTopic, MinerAddressesMsg{addrinfo, actoraddress})
t.SyncClient.MustPublish(ctx, MinersAddrsTopic, MinerAddressesMsg{
FullNetAddrs: fullNodeNetAddrs,
MinerNetAddrs: minerNetAddrs,
MinerActorAddr: minerActor,
})
t.RecordMessage("connecting to all other miners")
// connect to all other miners.
// densely connect the miner's full nodes.
minerCh := make(chan *MinerAddressesMsg, 16)
sctx, cancel := context.WithCancel(ctx)
defer cancel()
t.SyncClient.MustSubscribe(sctx, MinersAddrsTopic, minerCh)
for i := 0; i < t.IntParam("miners"); i++ {
if miner := <-minerCh; miner.ActorAddr != actoraddress {
err := n.FullApi.NetConnect(ctx, miner.PeerAddr)
if err != nil {
return nil, fmt.Errorf("failed to connect to miner %s on: %v", miner.ActorAddr, miner.PeerAddr)
}
t.RecordMessage("connected to miner %s on %v", miner.ActorAddr, miner.PeerAddr)
m := <-minerCh
if m.MinerActorAddr == minerActor {
// once I find myself, I stop connecting to others, to avoid a simopen problem.
break
}
err := n.FullApi.NetConnect(ctx, m.FullNetAddrs)
if err != nil {
return nil, fmt.Errorf("failed to connect to miner %s on: %v", m.MinerActorAddr, m.FullNetAddrs)
}
t.RecordMessage("connected to full node of miner %s on %v", m.MinerActorAddr, m.FullNetAddrs)
}
t.RecordMessage("waiting for all nodes to be ready")
@ -411,6 +423,15 @@ func startStorageMinerAPIServer(t *TestEnvironment, repo *repo.MemRepo, minerApi
mux.PathPrefix("/remote").HandlerFunc(minerApi.(*impl.StorageMinerAPI).ServeRemote)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
exporter, err := prometheus.NewExporter(prometheus.Options{
Namespace: "lotus",
})
if err != nil {
return err
}
mux.Handle("/debug/metrics", exporter)
ah := &auth.Handler{
Verify: minerApi.AuthVerify,
Next: mux.ServeHTTP,

View File

@ -12,7 +12,7 @@ var (
GenesisTopic = sync.NewTopic("genesis", &GenesisMsg{})
BalanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{})
PresealTopic = sync.NewTopic("preseal", &PresealMsg{})
ClientsAddrsTopic = sync.NewTopic("clients_addrs", &peer.AddrInfo{})
ClientsAddrsTopic = sync.NewTopic("clients_addrs", &ClientAddressesMsg{})
MinersAddrsTopic = sync.NewTopic("miners_addrs", &MinerAddressesMsg{})
PubsubTracerTopic = sync.NewTopic("pubsub_tracer", &PubsubTracerMsg{})
DrandConfigTopic = sync.NewTopic("drand_config", &DrandRuntimeInfo{})
@ -40,9 +40,15 @@ type GenesisMsg struct {
Bootstrapper []byte
}
type ClientAddressesMsg struct {
PeerAddr peer.AddrInfo
WalletAddr address.Address
}
type MinerAddressesMsg struct {
PeerAddr peer.AddrInfo
ActorAddr address.Address
FullNetAddrs peer.AddrInfo
MinerNetAddrs peer.AddrInfo
MinerActorAddr address.Address
}
type PubsubTracerMsg struct {