lotus/lotus-soup/testkit/role_client.go
2020-07-07 14:08:51 +01:00

184 lines
4.6 KiB
Go

package testkit
import (
"context"
"fmt"
"net/http"
"time"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/specs-actors/actors/crypto"
)
type LotusClient struct {
*LotusNode
t *TestEnvironment
MinerAddrs []MinerAddressesMsg
}
func PrepareClient(t *TestEnvironment) (*LotusClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
defer cancel()
pubsubTracer, err := GetPubsubTracerMaddr(ctx, t)
if err != nil {
return nil, err
}
drandOpt, err := GetRandomBeaconOpts(ctx, t)
if err != nil {
return nil, err
}
// first create a wallet
walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS)
if err != nil {
return nil, err
}
// publish the account ID/balance
balance := t.IntParam("balance")
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
t.SyncClient.Publish(ctx, BalanceTopic, balanceMsg)
// then collect the genesis block and bootstrapper address
genesisMsg, err := WaitForGenesis(t, ctx)
if err != nil {
return nil, err
}
clientIP := t.NetClient.MustGetDataNetworkIP().String()
nodeRepo := repo.NewMemory(nil)
// create the node
n := &LotusNode{}
stop, err := node.New(context.Background(),
node.FullAPI(&n.FullApi),
node.Online(),
node.Repo(nodeRepo),
withApiEndpoint(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", t.PortNumber("node_rpc", "0"))),
withGenesis(genesisMsg.Genesis),
withListenAddress(clientIP),
withBootstrapper(genesisMsg.Bootstrapper),
withPubsubConfig(false, pubsubTracer),
drandOpt,
)
if err != nil {
return nil, err
}
n.StopFn = stop
// set the wallet
err = n.setWallet(ctx, walletKey)
if err != nil {
_ = stop(context.TODO())
return nil, err
}
err = startFullNodeAPIServer(t, nodeRepo, n.FullApi)
if err != nil {
return nil, err
}
registerAndExportMetrics(fmt.Sprintf("client_%d", t.GroupSeq))
t.RecordMessage("publish our address to the clients addr topic")
addrinfo, err := n.FullApi.NetAddrsListen(ctx)
if err != nil {
return nil, err
}
t.SyncClient.MustPublish(ctx, ClientsAddrsTopic, &ClientAddressesMsg{
PeerAddr: addrinfo,
WalletAddr: walletKey.Address,
})
t.RecordMessage("waiting for all nodes to be ready")
t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
// collect miner addresses.
addrs, err := CollectMinerAddrs(t, ctx, t.IntParam("miners"))
if err != nil {
return nil, err
}
t.RecordMessage("got %v miner addrs", len(addrs))
// densely connect the client to the full node and the miners themselves.
for _, miner := range addrs {
if err := n.FullApi.NetConnect(ctx, miner.FullNetAddrs); err != nil {
return nil, fmt.Errorf("client failed to connect to full node of miner: %w", err)
}
if err := n.FullApi.NetConnect(ctx, miner.MinerNetAddrs); err != nil {
return nil, fmt.Errorf("client failed to connect to storage miner node node of miner: %w", err)
}
}
// wait for all clients to have completed identify, pubsub negotiation with miners.
time.Sleep(1 * time.Second)
peers, err := n.FullApi.NetPeers(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query connected peers: %w", err)
}
t.RecordMessage("connected peers: %d", len(peers))
cl := &LotusClient{
t: t,
LotusNode: n,
MinerAddrs: addrs,
}
return cl, nil
}
func (c *LotusClient) RunDefault() error {
// run forever
c.t.RecordMessage("running default client forever")
c.t.WaitUntilAllDone()
return nil
}
func startFullNodeAPIServer(t *TestEnvironment, repo *repo.MemRepo, api api.FullNode) error {
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(api))
ah := &auth.Handler{
Verify: api.AuthVerify,
Next: rpcServer.ServeHTTP,
}
http.Handle("/rpc/v0", ah)
exporter, err := prometheus.NewExporter(prometheus.Options{
Namespace: "lotus",
})
if err != nil {
return err
}
http.Handle("/debug/metrics", exporter)
srv := &http.Server{Handler: http.DefaultServeMux}
endpoint, err := repo.APIEndpoint()
if err != nil {
return fmt.Errorf("no API endpoint in repo: %w", err)
}
listenAddr, err := startServer(endpoint, srv)
if err != nil {
return fmt.Errorf("failed to start client API endpoint: %w", err)
}
t.RecordMessage("started node API server at %s", listenAddr)
return nil
}