200 lines
4.9 KiB
Go
200 lines
4.9 KiB
Go
package testkit
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"contrib.go.opencensus.io/exporter/prometheus"
|
|
"github.com/gorilla/mux"
|
|
"github.com/hashicorp/go-multierror"
|
|
|
|
"github.com/filecoin-project/go-jsonrpc"
|
|
"github.com/filecoin-project/go-jsonrpc/auth"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/chain/wallet/key"
|
|
"github.com/filecoin-project/lotus/node"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
)
|
|
|
|
type LotusClient struct {
|
|
*LotusNode
|
|
|
|
t *TestEnvironment
|
|
MinerAddrs []MinerAddressesMsg
|
|
}
|
|
|
|
func PrepareClient(t *TestEnvironment) (*LotusClient, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout)
|
|
defer cancel()
|
|
|
|
ApplyNetworkParameters(t)
|
|
|
|
pubsubTracer, err := GetPubsubTracerMaddr(ctx, t)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
drandOpt, err := GetRandomBeaconOpts(ctx, t)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// first create a wallet
|
|
walletKey, err := key.GenerateKey(types.KTBLS)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// publish the account ID/balance
|
|
balance := t.FloatParam("balance")
|
|
balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance}
|
|
t.SyncClient.Publish(ctx, BalanceTopic, balanceMsg)
|
|
|
|
// then collect the genesis block and bootstrapper address
|
|
genesisMsg, err := WaitForGenesis(t, ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clientIP := t.NetClient.MustGetDataNetworkIP().String()
|
|
|
|
nodeRepo := repo.NewMemory(nil)
|
|
|
|
// create the node
|
|
n := &LotusNode{}
|
|
stop, err := node.New(context.Background(),
|
|
node.FullAPI(&n.FullApi),
|
|
node.Base(),
|
|
node.Repo(nodeRepo),
|
|
withApiEndpoint(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", t.PortNumber("node_rpc", "0"))),
|
|
withGenesis(genesisMsg.Genesis),
|
|
withListenAddress(clientIP),
|
|
withBootstrapper(genesisMsg.Bootstrapper),
|
|
withPubsubConfig(false, pubsubTracer),
|
|
drandOpt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// set the wallet
|
|
err = n.setWallet(ctx, walletKey)
|
|
if err != nil {
|
|
_ = stop(context.TODO())
|
|
return nil, err
|
|
}
|
|
|
|
fullSrv, err := startFullNodeAPIServer(t, nodeRepo, n.FullApi)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
n.StopFn = func(ctx context.Context) error {
|
|
var err *multierror.Error
|
|
err = multierror.Append(fullSrv.Shutdown(ctx))
|
|
err = multierror.Append(stop(ctx))
|
|
return err.ErrorOrNil()
|
|
}
|
|
|
|
registerAndExportMetrics(fmt.Sprintf("client_%d", t.GroupSeq))
|
|
|
|
t.RecordMessage("publish our address to the clients addr topic")
|
|
addrinfo, err := n.FullApi.NetAddrsListen(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
t.SyncClient.MustPublish(ctx, ClientsAddrsTopic, &ClientAddressesMsg{
|
|
PeerNetAddr: addrinfo,
|
|
WalletAddr: walletKey.Address,
|
|
GroupSeq: t.GroupSeq,
|
|
})
|
|
|
|
t.RecordMessage("waiting for all nodes to be ready")
|
|
t.SyncClient.MustSignalAndWait(ctx, StateReady, t.TestInstanceCount)
|
|
|
|
// collect miner addresses.
|
|
addrs, err := CollectMinerAddrs(t, ctx, t.IntParam("miners"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
t.RecordMessage("got %v miner addrs", len(addrs))
|
|
|
|
// densely connect the client to the full node and the miners themselves.
|
|
for _, miner := range addrs {
|
|
if err := n.FullApi.NetConnect(ctx, miner.FullNetAddrs); err != nil {
|
|
return nil, fmt.Errorf("client failed to connect to full node of miner: %w", err)
|
|
}
|
|
if err := n.FullApi.NetConnect(ctx, miner.MinerNetAddrs); err != nil {
|
|
return nil, fmt.Errorf("client failed to connect to storage miner node node of miner: %w", err)
|
|
}
|
|
}
|
|
|
|
// wait for all clients to have completed identify, pubsub negotiation with miners.
|
|
time.Sleep(1 * time.Second)
|
|
|
|
peers, err := n.FullApi.NetPeers(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query connected peers: %w", err)
|
|
}
|
|
|
|
t.RecordMessage("connected peers: %d", len(peers))
|
|
|
|
cl := &LotusClient{
|
|
t: t,
|
|
LotusNode: n,
|
|
MinerAddrs: addrs,
|
|
}
|
|
return cl, nil
|
|
}
|
|
|
|
func (c *LotusClient) RunDefault() error {
|
|
// run forever
|
|
c.t.RecordMessage("running default client forever")
|
|
c.t.WaitUntilAllDone()
|
|
return nil
|
|
}
|
|
|
|
func startFullNodeAPIServer(t *TestEnvironment, repo repo.Repo, napi api.FullNode) (*http.Server, error) {
|
|
mux := mux.NewRouter()
|
|
|
|
rpcServer := jsonrpc.NewServer()
|
|
rpcServer.Register("Filecoin", napi)
|
|
|
|
mux.Handle("/rpc/v0", rpcServer)
|
|
|
|
exporter, err := prometheus.NewExporter(prometheus.Options{
|
|
Namespace: "lotus",
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mux.Handle("/debug/metrics", exporter)
|
|
|
|
ah := &auth.Handler{
|
|
Verify: func(ctx context.Context, token string) ([]auth.Permission, error) {
|
|
return api.AllPermissions, nil
|
|
},
|
|
Next: mux.ServeHTTP,
|
|
}
|
|
|
|
srv := &http.Server{Handler: ah, ReadHeaderTimeout: 30 * time.Second}
|
|
|
|
endpoint, err := repo.APIEndpoint()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("no API endpoint in repo: %w", err)
|
|
}
|
|
|
|
listenAddr, err := startServer(endpoint, srv)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to start client API endpoint: %w", err)
|
|
}
|
|
|
|
t.RecordMessage("started node API server at %s", listenAddr)
|
|
return srv, nil
|
|
}
|