diff --git a/lotus-soup/baseline.go b/lotus-soup/baseline.go deleted file mode 100644 index 134d42fec..000000000 --- a/lotus-soup/baseline.go +++ /dev/null @@ -1,253 +0,0 @@ -package main - -import ( - "bytes" - "context" - "fmt" - "io/ioutil" - "math/rand" - "os" - "path/filepath" - "time" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" - - "github.com/ipfs/go-cid" - files "github.com/ipfs/go-ipfs-files" - ipld "github.com/ipfs/go-ipld-format" - dag "github.com/ipfs/go-merkledag" - dstest "github.com/ipfs/go-merkledag/test" - unixfile "github.com/ipfs/go-unixfs/file" - "github.com/ipld/go-car" -) - -// This is the basline test; Filecoin 101. -// -// A network with a bootstrapper, a number of miners, and a number of clients/full nodes -// is constructed and connected through the bootstrapper. -// Some funds are allocated to each node and a number of sectors are presealed in the genesis block. -// -// The test plan: -// One or more clients store content to one or more miners, testing storage deals. -// The plan ensures that the storage deals hit the blockchain and measure the time it took. -// Verification: one or more clients retrieve and verify the hashes of stored content. -// The plan ensures that all (previously) published content can be correctly retrieved -// and measures the time it took. -// -// Preparation of the genesis block: this is the responsibility of the bootstrapper. -// In order to compute the genesis block, we need to collect identities and presealed -// sectors from each node. -// The we create a genesis block that allocates some funds to each node and collects -// the presealed sectors. -var baselineRoles = map[string]func(*TestEnvironment) error{ - "bootstrapper": runBootstrapper, - "miner": runMiner, - "client": runBaselineClient, - "drand": runDrandNode, - "pubsub-tracer": runPubsubTracer, -} - -func runBaselineClient(t *TestEnvironment) error { - t.RecordMessage("running client") - cl, err := prepareClient(t) - if err != nil { - return err - } - - ctx := context.Background() - addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners")) - if err != nil { - return err - } - t.RecordMessage("got %v miner addrs", len(addrs)) - - client := cl.fullApi - - // select a random miner - minerAddr := addrs[rand.Intn(len(addrs))] - if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil { - return err - } - t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1) - - t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr) - - time.Sleep(2 * time.Second) - - // generate random data - data := make([]byte, 1600) - rand.New(rand.NewSource(time.Now().UnixNano())).Read(data) - - file, err := ioutil.TempFile("/tmp", "data") - if err != nil { - return err - } - defer os.Remove(file.Name()) - - _, err = file.Write(data) - if err != nil { - return err - } - - fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false}) - if err != nil { - return err - } - t.RecordMessage("file cid: %s", fcid) - - // start deal - t1 := time.Now() - deal := startDeal(ctx, minerAddr.ActorAddr, client, fcid) - t.RecordMessage("started deal: %s", deal) - - // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this - time.Sleep(2 * time.Second) - - t.RecordMessage("waiting for deal to be sealed") - waitDealSealed(t, ctx, client, deal) - t.D().ResettingHistogram("deal.sealed").Update(int64(time.Since(t1))) - - carExport := true - - t.RecordMessage("trying to retrieve %s", fcid) - retrieveData(t, ctx, err, client, fcid, carExport, data) - t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1))) - - t.SyncClient.MustSignalEntry(ctx, stateStopMining) - - time.Sleep(10 * time.Second) // wait for metrics to be emitted - - // TODO broadcast published content CIDs to other clients - // TODO select a random piece of content published by some other client and retrieve it - - t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) - return nil -} - -func startDeal(ctx context.Context, minerActorAddr address.Address, client api.FullNode, fcid cid.Cid) *cid.Cid { - addr, err := client.WalletDefaultAddress(ctx) - if err != nil { - panic(err) - } - - deal, err := client.ClientStartDeal(ctx, &api.StartDealParams{ - Data: &storagemarket.DataRef{Root: fcid}, - Wallet: addr, - Miner: minerActorAddr, - EpochPrice: types.NewInt(1000000), - MinBlocksDuration: 1000, - }) - if err != nil { - panic(err) - } - return deal -} - -func waitDealSealed(t *TestEnvironment, ctx context.Context, client api.FullNode, deal *cid.Cid) { -loop: - for { - di, err := client.ClientGetDealInfo(ctx, *deal) - if err != nil { - panic(err) - } - switch di.State { - case storagemarket.StorageDealProposalRejected: - panic("deal rejected") - case storagemarket.StorageDealFailing: - panic("deal failed") - case storagemarket.StorageDealError: - panic(fmt.Sprintf("deal errored %s", di.Message)) - case storagemarket.StorageDealActive: - t.RecordMessage("completed deal: %s", di) - break loop - } - t.RecordMessage("deal state: %s", storagemarket.DealStates[di.State]) - time.Sleep(2 * time.Second) - } -} - -func retrieveData(t *TestEnvironment, ctx context.Context, err error, client api.FullNode, fcid cid.Cid, carExport bool, data []byte) { - t1 := time.Now() - offers, err := client.ClientFindData(ctx, fcid) - if err != nil { - panic(err) - } - for _, o := range offers { - t.D().Counter(fmt.Sprintf("find-data.offer,miner=%s", o.Miner)).Inc(1) - } - t.D().ResettingHistogram("find-data").Update(int64(time.Since(t1))) - - if len(offers) < 1 { - panic("no offers") - } - - rpath, err := ioutil.TempDir("", "lotus-retrieve-test-") - if err != nil { - panic(err) - } - defer os.RemoveAll(rpath) - - caddr, err := client.WalletDefaultAddress(ctx) - if err != nil { - panic(err) - } - - ref := &api.FileRef{ - Path: filepath.Join(rpath, "ret"), - IsCAR: carExport, - } - t1 = time.Now() - err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) - if err != nil { - panic(err) - } - t.D().ResettingHistogram("retrieve-data").Update(int64(time.Since(t1))) - - rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret")) - if err != nil { - panic(err) - } - - if carExport { - rdata = extractCarData(ctx, rdata, rpath) - } - - if !bytes.Equal(rdata, data) { - panic("wrong data retrieved") - } - - t.RecordMessage("retrieved successfully") -} - -func extractCarData(ctx context.Context, rdata []byte, rpath string) []byte { - bserv := dstest.Bserv() - ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata)) - if err != nil { - panic(err) - } - b, err := bserv.GetBlock(ctx, ch.Roots[0]) - if err != nil { - panic(err) - } - nd, err := ipld.Decode(b) - if err != nil { - panic(err) - } - dserv := dag.NewDAGService(bserv) - fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd) - if err != nil { - panic(err) - } - outPath := filepath.Join(rpath, "retLoadedCAR") - if err := files.WriteTo(fil, outPath); err != nil { - panic(err) - } - rdata, err = ioutil.ReadFile(outPath) - if err != nil { - panic(err) - } - return rdata -} diff --git a/lotus-soup/common_roles.go b/lotus-soup/common_roles.go deleted file mode 100644 index f6aa2454f..000000000 --- a/lotus-soup/common_roles.go +++ /dev/null @@ -1,106 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/filecoin-project/lotus/build" - "github.com/testground/sdk-go/sync" -) - -func runBootstrapper(t *TestEnvironment) error { - t.RecordMessage("running bootstrapper") - _, err := prepareBootstrapper(t) - if err != nil { - return err - } - - ctx := context.Background() - t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) - return nil -} - -func runMiner(t *TestEnvironment) error { - t.RecordMessage("running miner") - miner, err := prepareMiner(t) - if err != nil { - return err - } - - t.RecordMessage("block delay: %v", build.BlockDelay) - t.D().Gauge("miner.block-delay").Update(build.BlockDelay) - - ctx := context.Background() - - clients := t.IntParam("clients") - miners := t.IntParam("miners") - - myActorAddr, err := miner.minerApi.ActorAddress(ctx) - if err != nil { - return err - } - - // mine / stop mining - mine := true - done := make(chan struct{}) - - if miner.MineOne != nil { - go func() { - defer t.RecordMessage("shutting down mining") - defer close(done) - - var i int - for i = 0; mine; i++ { - // synchronize all miners to mine the next block - t.RecordMessage("synchronizing all miners to mine next block [%d]", i) - stateMineNext := sync.State(fmt.Sprintf("mine-block-%d", i)) - t.SyncClient.MustSignalAndWait(ctx, stateMineNext, miners) - - ch := make(chan struct{}) - err := miner.MineOne(ctx, func(mined bool) { - if mined { - t.D().Counter(fmt.Sprintf("block.mine,miner=%s", myActorAddr)).Inc(1) - } - close(ch) - }) - if err != nil { - panic(err) - } - <-ch - } - - // signal the last block to make sure no miners are left stuck waiting for the next block signal - // while the others have stopped - stateMineLast := sync.State(fmt.Sprintf("mine-block-%d", i)) - t.SyncClient.MustSignalEntry(ctx, stateMineLast) - }() - } else { - close(done) - } - - // wait for a signal from all clients to stop mining - err = <-t.SyncClient.MustBarrier(ctx, stateStopMining, clients).C - if err != nil { - return err - } - - mine = false - <-done - - t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) - return nil -} - -func runDrandNode(t *TestEnvironment) error { - t.RecordMessage("running drand node") - dr, err := prepareDrandNode(t) - if err != nil { - return err - } - defer dr.Cleanup() - - // TODO add ability to halt / recover on demand - ctx := context.Background() - t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) - return nil -} diff --git a/lotus-soup/deals.go b/lotus-soup/deals.go new file mode 100644 index 000000000..5ae2699a5 --- /dev/null +++ b/lotus-soup/deals.go @@ -0,0 +1,55 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" +) + +func startDeal(ctx context.Context, minerActorAddr address.Address, client api.FullNode, fcid cid.Cid) *cid.Cid { + addr, err := client.WalletDefaultAddress(ctx) + if err != nil { + panic(err) + } + + deal, err := client.ClientStartDeal(ctx, &api.StartDealParams{ + Data: &storagemarket.DataRef{Root: fcid}, + Wallet: addr, + Miner: minerActorAddr, + EpochPrice: types.NewInt(1000000), + MinBlocksDuration: 1000, + }) + if err != nil { + panic(err) + } + return deal +} + +func waitDealSealed(t *TestEnvironment, ctx context.Context, client api.FullNode, deal *cid.Cid) { +loop: + for { + di, err := client.ClientGetDealInfo(ctx, *deal) + if err != nil { + panic(err) + } + switch di.State { + case storagemarket.StorageDealProposalRejected: + panic("deal rejected") + case storagemarket.StorageDealFailing: + panic("deal failed") + case storagemarket.StorageDealError: + panic(fmt.Sprintf("deal errored %s", di.Message)) + case storagemarket.StorageDealActive: + t.RecordMessage("completed deal: %s", di) + break loop + } + t.RecordMessage("deal state: %s", storagemarket.DealStates[di.State]) + time.Sleep(2 * time.Second) + } +} diff --git a/lotus-soup/lotus_opts.go b/lotus-soup/lotus_opts.go new file mode 100644 index 000000000..a02befb56 --- /dev/null +++ b/lotus-soup/lotus_opts.go @@ -0,0 +1,67 @@ +package main + +import ( + "fmt" + + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/modules/lp2p" + "github.com/filecoin-project/lotus/node/repo" + + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" +) + +func withGenesis(gb []byte) node.Option { + return node.Override(new(modules.Genesis), modules.LoadGenesis(gb)) +} + +func withBootstrapper(ab []byte) node.Option { + return node.Override(new(dtypes.BootstrapPeers), + func() (dtypes.BootstrapPeers, error) { + if ab == nil { + return dtypes.BootstrapPeers{}, nil + } + + a, err := ma.NewMultiaddrBytes(ab) + if err != nil { + return nil, err + } + ai, err := peer.AddrInfoFromP2pAddr(a) + if err != nil { + return nil, err + } + return dtypes.BootstrapPeers{*ai}, nil + }) +} + +func withPubsubConfig(bootstrapper bool, pubsubTracer string) node.Option { + return node.Override(new(*config.Pubsub), func() *config.Pubsub { + return &config.Pubsub{ + Bootstrapper: bootstrapper, + RemoteTracer: pubsubTracer, + } + }) +} + +func withListenAddress(ip string) node.Option { + addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4001", ip)} + return node.Override(node.StartListeningKey, lp2p.StartListening(addrs)) +} + +func withMinerListenAddress(ip string) node.Option { + addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4002", ip)} + return node.Override(node.StartListeningKey, lp2p.StartListening(addrs)) +} + +func withApiEndpoint(addr string) node.Option { + return node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { + apima, err := ma.NewMultiaddr(addr) + if err != nil { + return err + } + return lr.SetAPIEndpoint(apima) + }) +} diff --git a/lotus-soup/main.go b/lotus-soup/main.go index e0906be1e..051f51eb6 100644 --- a/lotus-soup/main.go +++ b/lotus-soup/main.go @@ -8,7 +8,7 @@ import ( ) var testplans = map[string]interface{}{ - "lotus-baseline": doRun(baselineRoles), + "lotus-baseline": doRun(basicRoles), } func main() { diff --git a/lotus-soup/node.go b/lotus-soup/node.go index 13bb959f7..95a628607 100644 --- a/lotus-soup/node.go +++ b/lotus-soup/node.go @@ -1,62 +1,35 @@ package main import ( - "bytes" "context" - "crypto/rand" "fmt" - "io/ioutil" "net/http" "os" "sort" "strings" "time" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-jsonrpc" - "github.com/filecoin-project/go-jsonrpc/auth" - "github.com/filecoin-project/go-storedcounter" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/beacon" - genesis_chain "github.com/filecoin-project/lotus/chain/gen/genesis" - "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet" - "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" - "github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/metrics" - "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/impl" - "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/modules/lp2p" modtest "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" - "github.com/filecoin-project/specs-actors/actors/builtin" saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/builtin/verifreg" - "github.com/filecoin-project/specs-actors/actors/crypto" - "github.com/gorilla/mux" - "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" influxdb "github.com/kpacha/opencensus-influxdb" - libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" - "github.com/multiformats/go-multiaddr" - ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" "github.com/testground/sdk-go/run" "github.com/testground/sdk-go/runtime" - "github.com/testground/sdk-go/sync" - "go.opencensus.io/stats" "go.opencensus.io/stats/view" ) @@ -76,21 +49,7 @@ func init() { verifreg.MinVerifiedDealSize = big.NewInt(256) } -var ( - PrepareNodeTimeout = time.Minute - - genesisTopic = sync.NewTopic("genesis", &GenesisMsg{}) - balanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{}) - presealTopic = sync.NewTopic("preseal", &PresealMsg{}) - - clientsAddrsTopic = sync.NewTopic("clientsAddrsTopic", &peer.AddrInfo{}) - minersAddrsTopic = sync.NewTopic("minersAddrsTopic", &MinerAddresses{}) - - stateReady = sync.State("ready") - stateDone = sync.State("done") - stateStopMining = sync.State("stop-mining") - stateMinerPickSeqNum = sync.State("miner-pick-seq-num") -) +var PrepareNodeTimeout = time.Minute type TestEnvironment struct { *runtime.RunEnv @@ -117,482 +76,6 @@ type Node struct { MineOne func(context.Context, func(bool)) error } -type InitialBalanceMsg struct { - Addr address.Address - Balance int -} - -type PresealMsg struct { - Miner genesis.Miner - Seqno int64 -} - -type GenesisMsg struct { - Genesis []byte - Bootstrapper []byte -} - -type MinerAddresses struct { - PeerAddr peer.AddrInfo - ActorAddr address.Address -} - -func prepareBootstrapper(t *TestEnvironment) (*Node, error) { - ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) - defer cancel() - - pubsubTracer, err := getPubsubTracerConfig(ctx, t) - if err != nil { - return nil, err - } - - clients := t.IntParam("clients") - miners := t.IntParam("miners") - nodes := clients + miners - - drandOpt, err := getDrandConfig(ctx, t) - if err != nil { - return nil, err - } - - // the first duty of the boostrapper is to construct the genesis block - // first collect all client and miner balances to assign initial funds - balances, err := waitForBalances(t, ctx, nodes) - if err != nil { - return nil, err - } - - // then collect all preseals from miners - preseals, err := collectPreseals(t, ctx, miners) - if err != nil { - return nil, err - } - - // now construct the genesis block - var genesisActors []genesis.Actor - var genesisMiners []genesis.Miner - - for _, bm := range balances { - genesisActors = append(genesisActors, - genesis.Actor{ - Type: genesis.TAccount, - Balance: big.Mul(big.NewInt(int64(bm.Balance)), types.NewInt(build.FilecoinPrecision)), - Meta: (&genesis.AccountMeta{Owner: bm.Addr}).ActorMeta(), - }) - } - - for _, pm := range preseals { - genesisMiners = append(genesisMiners, pm.Miner) - } - - genesisTemplate := genesis.Template{ - Accounts: genesisActors, - Miners: genesisMiners, - Timestamp: uint64(time.Now().Unix()) - uint64(t.IntParam("genesis_timestamp_offset")), // this needs to be in the past - } - - // dump the genesis block - // var jsonBuf bytes.Buffer - // jsonEnc := json.NewEncoder(&jsonBuf) - // err := jsonEnc.Encode(genesisTemplate) - // if err != nil { - // panic(err) - // } - // runenv.RecordMessage(fmt.Sprintf("Genesis template: %s", string(jsonBuf.Bytes()))) - - // this is horrendously disgusting, we use this contraption to side effect the construction - // of the genesis block in the buffer -- yes, a side effect of dependency injection. - // I remember when software was straightforward... - var genesisBuffer bytes.Buffer - - bootstrapperIP := t.NetClient.MustGetDataNetworkIP().String() - - n := &Node{} - stop, err := node.New(context.Background(), - node.FullAPI(&n.fullApi), - node.Online(), - node.Repo(repo.NewMemory(nil)), - node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)), - withApiEndpoint("/ip4/127.0.0.1/tcp/1234"), - withListenAddress(bootstrapperIP), - withBootstrapper(nil), - withPubsubConfig(true, pubsubTracer), - drandOpt, - ) - if err != nil { - return nil, err - } - n.stop = stop - - var bootstrapperAddr ma.Multiaddr - - bootstrapperAddrs, err := n.fullApi.NetAddrsListen(ctx) - if err != nil { - stop(context.TODO()) - return nil, err - } - for _, a := range bootstrapperAddrs.Addrs { - ip, err := a.ValueForProtocol(ma.P_IP4) - if err != nil { - continue - } - if ip != bootstrapperIP { - continue - } - addrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ - ID: bootstrapperAddrs.ID, - Addrs: []ma.Multiaddr{a}, - }) - if err != nil { - panic(err) - } - bootstrapperAddr = addrs[0] - break - } - - if bootstrapperAddr == nil { - panic("failed to determine bootstrapper address") - } - - genesisMsg := &GenesisMsg{ - Genesis: genesisBuffer.Bytes(), - Bootstrapper: bootstrapperAddr.Bytes(), - } - t.SyncClient.MustPublish(ctx, genesisTopic, genesisMsg) - - t.RecordMessage("waiting for all nodes to be ready") - t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) - - return n, nil -} - -func prepareMiner(t *TestEnvironment) (*Node, error) { - ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) - defer cancel() - - pubsubTracer, err := getPubsubTracerConfig(ctx, t) - if err != nil { - return nil, err - } - - drandOpt, err := getDrandConfig(ctx, t) - if err != nil { - return nil, err - } - - // first create a wallet - walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS) - if err != nil { - return nil, err - } - - // publish the account ID/balance - balance := t.IntParam("balance") - balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance} - t.SyncClient.Publish(ctx, balanceTopic, balanceMsg) - - // create and publish the preseal commitment - priv, _, err := libp2p_crypto.GenerateEd25519Key(rand.Reader) - if err != nil { - return nil, err - } - - minerID, err := peer.IDFromPrivateKey(priv) - if err != nil { - return nil, err - } - - // pick unique sequence number for each miner, no matter in which group they are - seq := t.SyncClient.MustSignalAndWait(ctx, stateMinerPickSeqNum, t.IntParam("miners")) - - minerAddr, err := address.NewIDAddress(genesis_chain.MinerStart + uint64(seq-1)) - if err != nil { - return nil, err - } - - presealDir, err := ioutil.TempDir("", "preseal") - if err != nil { - return nil, err - } - - sectors := t.IntParam("sectors") - genMiner, _, err := seed.PreSeal(minerAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, 0, sectors, presealDir, []byte("TODO: randomize this"), &walletKey.KeyInfo) - if err != nil { - return nil, err - } - genMiner.PeerId = minerID - - t.RecordMessage("Miner Info: Owner: %s Worker: %s", genMiner.Owner, genMiner.Worker) - - presealMsg := &PresealMsg{Miner: *genMiner, Seqno: seq} - t.SyncClient.Publish(ctx, presealTopic, presealMsg) - - // then collect the genesis block and bootstrapper address - genesisMsg, err := waitForGenesis(t, ctx) - if err != nil { - return nil, err - } - - // prepare the repo - minerRepo := repo.NewMemory(nil) - - lr, err := minerRepo.Lock(repo.StorageMiner) - if err != nil { - return nil, err - } - - ks, err := lr.KeyStore() - if err != nil { - return nil, err - } - - kbytes, err := priv.Bytes() - if err != nil { - return nil, err - } - - err = ks.Put("libp2p-host", types.KeyInfo{ - Type: "libp2p-host", - PrivateKey: kbytes, - }) - if err != nil { - return nil, err - } - - ds, err := lr.Datastore("/metadata") - if err != nil { - return nil, err - } - - err = ds.Put(datastore.NewKey("miner-address"), minerAddr.Bytes()) - if err != nil { - return nil, err - } - - nic := storedcounter.New(ds, datastore.NewKey(modules.StorageCounterDSPrefix)) - for i := 0; i < (sectors + 1); i++ { - _, err = nic.Next() - if err != nil { - return nil, err - } - } - - err = lr.Close() - if err != nil { - return nil, err - } - - minerIP := t.NetClient.MustGetDataNetworkIP().String() - - // create the node - // we need both a full node _and_ and storage miner node - n := &Node{} - - nodeRepo := repo.NewMemory(nil) - - stop1, err := node.New(context.Background(), - node.FullAPI(&n.fullApi), - node.Online(), - node.Repo(nodeRepo), - withGenesis(genesisMsg.Genesis), - withListenAddress(minerIP), - withBootstrapper(genesisMsg.Bootstrapper), - withPubsubConfig(false, pubsubTracer), - drandOpt, - ) - if err != nil { - return nil, err - } - - // set the wallet - err = n.setWallet(ctx, walletKey) - if err != nil { - stop1(context.TODO()) - return nil, err - } - - minerOpts := []node.Option{ - node.StorageMiner(&n.minerApi), - node.Online(), - node.Repo(minerRepo), - node.Override(new(api.FullNode), n.fullApi), - withApiEndpoint("/ip4/127.0.0.1/tcp/1234"), - withMinerListenAddress(minerIP), - } - - if t.StringParam("mining_mode") != "natural" { - mineBlock := make(chan func(bool)) - minerOpts = append(minerOpts, - node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock, minerAddr))) - n.MineOne = func(ctx context.Context, cb func(bool)) error { - select { - case mineBlock <- cb: - return nil - case <-ctx.Done(): - return ctx.Err() - } - } - } - - stop2, err := node.New(context.Background(), minerOpts...) - if err != nil { - stop1(context.TODO()) - return nil, err - } - n.stop = func(ctx context.Context) error { - // TODO use a multierror for this - err2 := stop2(ctx) - err1 := stop1(ctx) - if err2 != nil { - return err2 - } - return err1 - } - - registerAndExportMetrics(minerAddr.String()) - - // Bootstrap with full node - remoteAddrs, err := n.fullApi.NetAddrsListen(ctx) - if err != nil { - panic(err) - } - - err = n.minerApi.NetConnect(ctx, remoteAddrs) - if err != nil { - panic(err) - } - - err = startStorMinerAPIServer(minerRepo, n.minerApi) - if err != nil { - return nil, err - } - - // add local storage for presealed sectors - err = n.minerApi.StorageAddLocal(ctx, presealDir) - if err != nil { - n.stop(context.TODO()) - return nil, err - } - - // set the miner PeerID - minerIDEncoded, err := actors.SerializeParams(&saminer.ChangePeerIDParams{NewID: abi.PeerID(minerID)}) - if err != nil { - return nil, err - } - - changeMinerID := &types.Message{ - To: minerAddr, - From: genMiner.Worker, - Method: builtin.MethodsMiner.ChangePeerID, - Params: minerIDEncoded, - Value: types.NewInt(0), - GasPrice: types.NewInt(0), - GasLimit: 1000000, - } - - _, err = n.fullApi.MpoolPushMessage(ctx, changeMinerID) - if err != nil { - n.stop(context.TODO()) - return nil, err - } - - t.RecordMessage("publish our address to the miners addr topic") - actoraddress, err := n.minerApi.ActorAddress(ctx) - if err != nil { - return nil, err - } - addrinfo, err := n.minerApi.NetAddrsListen(ctx) - if err != nil { - return nil, err - } - t.SyncClient.MustPublish(ctx, minersAddrsTopic, MinerAddresses{addrinfo, actoraddress}) - - t.RecordMessage("waiting for all nodes to be ready") - t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) - - return n, err -} - -func prepareClient(t *TestEnvironment) (*Node, error) { - ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) - defer cancel() - - pubsubTracer, err := getPubsubTracerConfig(ctx, t) - if err != nil { - return nil, err - } - - drandOpt, err := getDrandConfig(ctx, t) - if err != nil { - return nil, err - } - - // first create a wallet - walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS) - if err != nil { - return nil, err - } - - // publish the account ID/balance - balance := t.IntParam("balance") - balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance} - t.SyncClient.Publish(ctx, balanceTopic, balanceMsg) - - // then collect the genesis block and bootstrapper address - genesisMsg, err := waitForGenesis(t, ctx) - if err != nil { - return nil, err - } - - clientIP := t.NetClient.MustGetDataNetworkIP().String() - - nodeRepo := repo.NewMemory(nil) - - // create the node - n := &Node{} - stop, err := node.New(context.Background(), - node.FullAPI(&n.fullApi), - node.Online(), - node.Repo(nodeRepo), - withApiEndpoint("/ip4/127.0.0.1/tcp/1234"), - withGenesis(genesisMsg.Genesis), - withListenAddress(clientIP), - withBootstrapper(genesisMsg.Bootstrapper), - withPubsubConfig(false, pubsubTracer), - drandOpt, - ) - if err != nil { - return nil, err - } - n.stop = stop - - // set the wallet - err = n.setWallet(ctx, walletKey) - if err != nil { - stop(context.TODO()) - return nil, err - } - - err = startClientAPIServer(nodeRepo, n.fullApi) - if err != nil { - return nil, err - } - - registerAndExportMetrics(fmt.Sprintf("client_%d", t.GroupSeq)) - - t.RecordMessage("publish our address to the clients addr topic") - addrinfo, err := n.fullApi.NetAddrsListen(ctx) - if err != nil { - return nil, err - } - t.SyncClient.MustPublish(ctx, clientsAddrsTopic, addrinfo) - - t.RecordMessage("waiting for all nodes to be ready") - t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) - - return n, nil -} - func (n *Node) setWallet(ctx context.Context, walletKey *wallet.Key) error { _, err := n.fullApi.WalletImport(ctx, &walletKey.KeyInfo) if err != nil { @@ -607,58 +90,6 @@ func (n *Node) setWallet(ctx context.Context, walletKey *wallet.Key) error { return nil } -func withGenesis(gb []byte) node.Option { - return node.Override(new(modules.Genesis), modules.LoadGenesis(gb)) -} - -func withBootstrapper(ab []byte) node.Option { - return node.Override(new(dtypes.BootstrapPeers), - func() (dtypes.BootstrapPeers, error) { - if ab == nil { - return dtypes.BootstrapPeers{}, nil - } - - a, err := ma.NewMultiaddrBytes(ab) - if err != nil { - return nil, err - } - ai, err := peer.AddrInfoFromP2pAddr(a) - if err != nil { - return nil, err - } - return dtypes.BootstrapPeers{*ai}, nil - }) -} - -func withPubsubConfig(bootstrapper bool, pubsubTracer string) node.Option { - return node.Override(new(*config.Pubsub), func() *config.Pubsub { - return &config.Pubsub{ - Bootstrapper: bootstrapper, - RemoteTracer: pubsubTracer, - } - }) -} - -func withListenAddress(ip string) node.Option { - addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4001", ip)} - return node.Override(node.StartListeningKey, lp2p.StartListening(addrs)) -} - -func withMinerListenAddress(ip string) node.Option { - addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4002", ip)} - return node.Override(node.StartListeningKey, lp2p.StartListening(addrs)) -} - -func withApiEndpoint(addr string) node.Option { - return node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { - apima, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return err - } - return lr.SetAPIEndpoint(apima) - }) -} - func waitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) { ch := make(chan *InitialBalanceMsg) sub := t.SyncClient.MustSubscribe(ctx, balanceTopic, ch) @@ -709,11 +140,11 @@ func waitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error } } -func collectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddresses, error) { - ch := make(chan MinerAddresses) +func collectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddressesMsg, error) { + ch := make(chan MinerAddressesMsg) sub := t.SyncClient.MustSubscribe(ctx, minersAddrsTopic, ch) - addrs := make([]MinerAddresses, 0, miners) + addrs := make([]MinerAddressesMsg, 0, miners) for i := 0; i < miners; i++ { select { case a := <-ch: @@ -743,7 +174,7 @@ func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([ return addrs, nil } -func getPubsubTracerConfig(ctx context.Context, t *TestEnvironment) (string, error) { +func getPubsubTracerMaddr(ctx context.Context, t *TestEnvironment) (string, error) { if !t.BooleanParam("enable_pubsub_tracer") { return "", nil } @@ -753,13 +184,13 @@ func getPubsubTracerConfig(ctx context.Context, t *TestEnvironment) (string, err select { case m := <-ch: - return m.Tracer, nil + return m.Multiaddr, nil case err := <-sub.Done(): return "", fmt.Errorf("got error while waiting for pubsub tracer config: %w", err) } } -func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error) { +func getDrandOpts(ctx context.Context, t *TestEnvironment) (node.Option, error) { beaconType := t.StringParam("random_beacon_type") switch beaconType { case "external-drand": @@ -795,42 +226,6 @@ func getDrandConfig(ctx context.Context, t *TestEnvironment) (node.Option, error } } -func startStorMinerAPIServer(repo *repo.MemRepo, minerApi api.StorageMiner) error { - mux := mux.NewRouter() - - rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(minerApi)) - - mux.Handle("/rpc/v0", rpcServer) - mux.PathPrefix("/remote").HandlerFunc(minerApi.(*impl.StorageMinerAPI).ServeRemote) - mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof - - ah := &auth.Handler{ - Verify: minerApi.AuthVerify, - Next: mux.ServeHTTP, - } - - srv := &http.Server{Handler: ah} - - return startServer(repo, srv) -} - -func startClientAPIServer(repo *repo.MemRepo, api api.FullNode) error { - rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(api)) - - ah := &auth.Handler{ - Verify: api.AuthVerify, - Next: rpcServer.ServeHTTP, - } - - http.Handle("/rpc/v0", ah) - - srv := &http.Server{Handler: http.DefaultServeMux} - - return startServer(repo, srv) -} - func startServer(repo *repo.MemRepo, srv *http.Server) error { endpoint, err := repo.APIEndpoint() if err != nil { diff --git a/lotus-soup/retrieval.go b/lotus-soup/retrieval.go new file mode 100644 index 000000000..97cadaca6 --- /dev/null +++ b/lotus-soup/retrieval.go @@ -0,0 +1,103 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/filecoin-project/lotus/api" + "github.com/ipfs/go-cid" + files "github.com/ipfs/go-ipfs-files" + ipld "github.com/ipfs/go-ipld-format" + dag "github.com/ipfs/go-merkledag" + dstest "github.com/ipfs/go-merkledag/test" + unixfile "github.com/ipfs/go-unixfs/file" + "github.com/ipld/go-car" +) + +func retrieveData(t *TestEnvironment, ctx context.Context, err error, client api.FullNode, fcid cid.Cid, carExport bool, data []byte) { + t1 := time.Now() + offers, err := client.ClientFindData(ctx, fcid) + if err != nil { + panic(err) + } + for _, o := range offers { + t.D().Counter(fmt.Sprintf("find-data.offer,miner=%s", o.Miner)).Inc(1) + } + t.D().ResettingHistogram("find-data").Update(int64(time.Since(t1))) + + if len(offers) < 1 { + panic("no offers") + } + + rpath, err := ioutil.TempDir("", "lotus-retrieve-test-") + if err != nil { + panic(err) + } + defer os.RemoveAll(rpath) + + caddr, err := client.WalletDefaultAddress(ctx) + if err != nil { + panic(err) + } + + ref := &api.FileRef{ + Path: filepath.Join(rpath, "ret"), + IsCAR: carExport, + } + t1 = time.Now() + err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) + if err != nil { + panic(err) + } + t.D().ResettingHistogram("retrieve-data").Update(int64(time.Since(t1))) + + rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret")) + if err != nil { + panic(err) + } + + if carExport { + rdata = extractCarData(ctx, rdata, rpath) + } + + if !bytes.Equal(rdata, data) { + panic("wrong data retrieved") + } + + t.RecordMessage("retrieved successfully") +} + +func extractCarData(ctx context.Context, rdata []byte, rpath string) []byte { + bserv := dstest.Bserv() + ch, err := car.LoadCar(bserv.Blockstore(), bytes.NewReader(rdata)) + if err != nil { + panic(err) + } + b, err := bserv.GetBlock(ctx, ch.Roots[0]) + if err != nil { + panic(err) + } + nd, err := ipld.Decode(b) + if err != nil { + panic(err) + } + dserv := dag.NewDAGService(bserv) + fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd) + if err != nil { + panic(err) + } + outPath := filepath.Join(rpath, "retLoadedCAR") + if err := files.WriteTo(fil, outPath); err != nil { + panic(err) + } + rdata, err = ioutil.ReadFile(outPath) + if err != nil { + panic(err) + } + return rdata +} diff --git a/lotus-soup/role_bootstrapper.go b/lotus-soup/role_bootstrapper.go new file mode 100644 index 000000000..a6bfa42c3 --- /dev/null +++ b/lotus-soup/role_bootstrapper.go @@ -0,0 +1,161 @@ +package main + +import ( + "bytes" + "context" + "time" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/genesis" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/modules" + modtest "github.com/filecoin-project/lotus/node/modules/testing" + "github.com/filecoin-project/lotus/node/repo" + + "github.com/filecoin-project/specs-actors/actors/abi/big" + + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" +) + +func runBootstrapper(t *TestEnvironment) error { + t.RecordMessage("running bootstrapper") + _, err := prepareBootstrapper(t) + if err != nil { + return err + } + + ctx := context.Background() + t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) + return nil +} + +func prepareBootstrapper(t *TestEnvironment) (*Node, error) { + ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) + defer cancel() + + pubsubTracer, err := getPubsubTracerMaddr(ctx, t) + if err != nil { + return nil, err + } + + clients := t.IntParam("clients") + miners := t.IntParam("miners") + nodes := clients + miners + + drandOpt, err := getDrandOpts(ctx, t) + if err != nil { + return nil, err + } + + // the first duty of the boostrapper is to construct the genesis block + // first collect all client and miner balances to assign initial funds + balances, err := waitForBalances(t, ctx, nodes) + if err != nil { + return nil, err + } + + // then collect all preseals from miners + preseals, err := collectPreseals(t, ctx, miners) + if err != nil { + return nil, err + } + + // now construct the genesis block + var genesisActors []genesis.Actor + var genesisMiners []genesis.Miner + + for _, bm := range balances { + genesisActors = append(genesisActors, + genesis.Actor{ + Type: genesis.TAccount, + Balance: big.Mul(big.NewInt(int64(bm.Balance)), types.NewInt(build.FilecoinPrecision)), + Meta: (&genesis.AccountMeta{Owner: bm.Addr}).ActorMeta(), + }) + } + + for _, pm := range preseals { + genesisMiners = append(genesisMiners, pm.Miner) + } + + genesisTemplate := genesis.Template{ + Accounts: genesisActors, + Miners: genesisMiners, + Timestamp: uint64(time.Now().Unix()) - uint64(t.IntParam("genesis_timestamp_offset")), // this needs to be in the past + } + + // dump the genesis block + // var jsonBuf bytes.Buffer + // jsonEnc := json.NewEncoder(&jsonBuf) + // err := jsonEnc.Encode(genesisTemplate) + // if err != nil { + // panic(err) + // } + // runenv.RecordMessage(fmt.Sprintf("Genesis template: %s", string(jsonBuf.Bytes()))) + + // this is horrendously disgusting, we use this contraption to side effect the construction + // of the genesis block in the buffer -- yes, a side effect of dependency injection. + // I remember when software was straightforward... + var genesisBuffer bytes.Buffer + + bootstrapperIP := t.NetClient.MustGetDataNetworkIP().String() + + n := &Node{} + stop, err := node.New(context.Background(), + node.FullAPI(&n.fullApi), + node.Online(), + node.Repo(repo.NewMemory(nil)), + node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genesisBuffer, genesisTemplate)), + withApiEndpoint("/ip4/127.0.0.1/tcp/1234"), + withListenAddress(bootstrapperIP), + withBootstrapper(nil), + withPubsubConfig(true, pubsubTracer), + drandOpt, + ) + if err != nil { + return nil, err + } + n.stop = stop + + var bootstrapperAddr ma.Multiaddr + + bootstrapperAddrs, err := n.fullApi.NetAddrsListen(ctx) + if err != nil { + stop(context.TODO()) + return nil, err + } + for _, a := range bootstrapperAddrs.Addrs { + ip, err := a.ValueForProtocol(ma.P_IP4) + if err != nil { + continue + } + if ip != bootstrapperIP { + continue + } + addrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ + ID: bootstrapperAddrs.ID, + Addrs: []ma.Multiaddr{a}, + }) + if err != nil { + panic(err) + } + bootstrapperAddr = addrs[0] + break + } + + if bootstrapperAddr == nil { + panic("failed to determine bootstrapper address") + } + + genesisMsg := &GenesisMsg{ + Genesis: genesisBuffer.Bytes(), + Bootstrapper: bootstrapperAddr.Bytes(), + } + t.SyncClient.MustPublish(ctx, genesisTopic, genesisMsg) + + t.RecordMessage("waiting for all nodes to be ready") + t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) + + return n, nil +} diff --git a/lotus-soup/role_client.go b/lotus-soup/role_client.go new file mode 100644 index 000000000..b3cbe4b02 --- /dev/null +++ b/lotus-soup/role_client.go @@ -0,0 +1,194 @@ +package main + +import ( + "context" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "os" + "time" + + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apistruct" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/repo" + + "github.com/filecoin-project/specs-actors/actors/crypto" +) + +func runBaselineClient(t *TestEnvironment) error { + t.RecordMessage("running client") + cl, err := prepareClient(t) + if err != nil { + return err + } + + ctx := context.Background() + addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners")) + if err != nil { + return err + } + t.RecordMessage("got %v miner addrs", len(addrs)) + + client := cl.fullApi + + // select a random miner + minerAddr := addrs[rand.Intn(len(addrs))] + if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil { + return err + } + t.D().Counter(fmt.Sprintf("send-data-to,miner=%s", minerAddr.ActorAddr)).Inc(1) + + t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr) + + time.Sleep(2 * time.Second) + + // generate 1600 bytes of random data + data := make([]byte, 1600) + rand.New(rand.NewSource(time.Now().UnixNano())).Read(data) + + file, err := ioutil.TempFile("/tmp", "data") + if err != nil { + return err + } + defer os.Remove(file.Name()) + + _, err = file.Write(data) + if err != nil { + return err + } + + fcid, err := client.ClientImport(ctx, api.FileRef{Path: file.Name(), IsCAR: false}) + if err != nil { + return err + } + t.RecordMessage("file cid: %s", fcid) + + // start deal + t1 := time.Now() + deal := startDeal(ctx, minerAddr.ActorAddr, client, fcid) + t.RecordMessage("started deal: %s", deal) + + // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this + time.Sleep(2 * time.Second) + + t.RecordMessage("waiting for deal to be sealed") + waitDealSealed(t, ctx, client, deal) + t.D().ResettingHistogram("deal.sealed").Update(int64(time.Since(t1))) + + carExport := true + + t.RecordMessage("trying to retrieve %s", fcid) + retrieveData(t, ctx, err, client, fcid, carExport, data) + t.D().ResettingHistogram("deal.retrieved").Update(int64(time.Since(t1))) + + t.SyncClient.MustSignalEntry(ctx, stateStopMining) + + time.Sleep(10 * time.Second) // wait for metrics to be emitted + + // TODO broadcast published content CIDs to other clients + // TODO select a random piece of content published by some other client and retrieve it + + t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) + return nil +} + +func prepareClient(t *TestEnvironment) (*Node, error) { + ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) + defer cancel() + + pubsubTracer, err := getPubsubTracerMaddr(ctx, t) + if err != nil { + return nil, err + } + + drandOpt, err := getDrandOpts(ctx, t) + if err != nil { + return nil, err + } + + // first create a wallet + walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS) + if err != nil { + return nil, err + } + + // publish the account ID/balance + balance := t.IntParam("balance") + balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance} + t.SyncClient.Publish(ctx, balanceTopic, balanceMsg) + + // then collect the genesis block and bootstrapper address + genesisMsg, err := waitForGenesis(t, ctx) + if err != nil { + return nil, err + } + + clientIP := t.NetClient.MustGetDataNetworkIP().String() + + nodeRepo := repo.NewMemory(nil) + + // create the node + n := &Node{} + stop, err := node.New(context.Background(), + node.FullAPI(&n.fullApi), + node.Online(), + node.Repo(nodeRepo), + withApiEndpoint("/ip4/127.0.0.1/tcp/1234"), + withGenesis(genesisMsg.Genesis), + withListenAddress(clientIP), + withBootstrapper(genesisMsg.Bootstrapper), + withPubsubConfig(false, pubsubTracer), + drandOpt, + ) + if err != nil { + return nil, err + } + n.stop = stop + + // set the wallet + err = n.setWallet(ctx, walletKey) + if err != nil { + stop(context.TODO()) + return nil, err + } + + err = startClientAPIServer(nodeRepo, n.fullApi) + if err != nil { + return nil, err + } + + registerAndExportMetrics(fmt.Sprintf("client_%d", t.GroupSeq)) + + t.RecordMessage("publish our address to the clients addr topic") + addrinfo, err := n.fullApi.NetAddrsListen(ctx) + if err != nil { + return nil, err + } + t.SyncClient.MustPublish(ctx, clientsAddrsTopic, addrinfo) + + t.RecordMessage("waiting for all nodes to be ready") + t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) + + return n, nil +} + +func startClientAPIServer(repo *repo.MemRepo, api api.FullNode) error { + rpcServer := jsonrpc.NewServer() + rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(api)) + + ah := &auth.Handler{ + Verify: api.AuthVerify, + Next: rpcServer.ServeHTTP, + } + + http.Handle("/rpc/v0", ah) + + srv := &http.Server{Handler: http.DefaultServeMux} + + return startServer(repo, srv) +} diff --git a/lotus-soup/drand.go b/lotus-soup/role_drand.go similarity index 94% rename from lotus-soup/drand.go rename to lotus-soup/role_drand.go index 507b05060..bd855c69a 100644 --- a/lotus-soup/drand.go +++ b/lotus-soup/role_drand.go @@ -14,28 +14,19 @@ import ( "github.com/drand/drand/chain" hclient "github.com/drand/drand/client/http" + "github.com/drand/drand/demo/node" "github.com/drand/drand/log" "github.com/drand/drand/lp2p" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" "github.com/testground/sdk-go/sync" - - "github.com/drand/drand/demo/node" ) -var ( - PrepareDrandTimeout = time.Minute - drandConfigTopic = sync.NewTopic("drand-config", &DrandRuntimeInfo{}) -) - -type DrandRuntimeInfo struct { - Config dtypes.DrandConfig - GossipBootstrap dtypes.DrandBootstrap -} +var PrepareDrandTimeout = time.Minute type DrandInstance struct { - Node node.Node + Node node.Node GossipRelay *lp2p.GossipRelayNode stateDir string @@ -45,17 +36,18 @@ func (d *DrandInstance) Cleanup() error { return os.RemoveAll(d.stateDir) } -// waitForDrandConfig should be called by filecoin instances before constructing the lotus Node -// you can use the returned dtypes.DrandConfig to override the default production config. -func waitForDrandConfig(ctx context.Context, client sync.Client) (*DrandRuntimeInfo, error) { - ch := make(chan *DrandRuntimeInfo, 1) - sub := client.MustSubscribe(ctx, drandConfigTopic, ch) - select { - case cfg := <-ch: - return cfg, nil - case err := <-sub.Done(): - return nil, err +func runDrandNode(t *TestEnvironment) error { + t.RecordMessage("running drand node") + dr, err := prepareDrandNode(t) + if err != nil { + return err } + defer dr.Cleanup() + + // TODO add ability to halt / recover on demand + ctx := context.Background() + t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) + return nil } // prepareDrandNode starts a drand instance and runs a DKG with the other members of the composition group. @@ -228,12 +220,25 @@ func prepareDrandNode(t *TestEnvironment) (*DrandInstance, error) { } return &DrandInstance{ - Node: n, + Node: n, GossipRelay: gossipRelay, - stateDir: stateDir, + stateDir: stateDir, }, nil } +// waitForDrandConfig should be called by filecoin instances before constructing the lotus Node +// you can use the returned dtypes.DrandConfig to override the default production config. +func waitForDrandConfig(ctx context.Context, client sync.Client) (*DrandRuntimeInfo, error) { + ch := make(chan *DrandRuntimeInfo, 1) + sub := client.MustSubscribe(ctx, drandConfigTopic, ch) + select { + case cfg := <-ch: + return cfg, nil + case err := <-sub.Done(): + return nil, err + } +} + func relayAddrInfo(addrs []ma.Multiaddr, dataIP net.IP) (*peer.AddrInfo, error) { for _, a := range addrs { if ip, _ := a.ValueForProtocol(ma.P_IP4); ip != dataIP.String() { @@ -242,4 +247,4 @@ func relayAddrInfo(addrs []ma.Multiaddr, dataIP net.IP) (*peer.AddrInfo, error) return peer.AddrInfoFromP2pAddr(a) } return nil, fmt.Errorf("no addr found with data ip %s in addrs: %v", dataIP, addrs) -} \ No newline at end of file +} diff --git a/lotus-soup/role_miner.go b/lotus-soup/role_miner.go new file mode 100644 index 000000000..fa022592c --- /dev/null +++ b/lotus-soup/role_miner.go @@ -0,0 +1,377 @@ +package main + +import ( + "context" + "crypto/rand" + "fmt" + "io/ioutil" + "net/http" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/filecoin-project/go-storedcounter" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apistruct" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + genesis_chain "github.com/filecoin-project/lotus/chain/gen/genesis" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" + "github.com/filecoin-project/lotus/miner" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/impl" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/repo" + "github.com/gorilla/mux" + libp2p_crypto "github.com/libp2p/go-libp2p-core/crypto" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin" + saminer "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/crypto" + + "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/testground/sdk-go/sync" +) + +func runMiner(t *TestEnvironment) error { + t.RecordMessage("running miner") + miner, err := prepareMiner(t) + if err != nil { + return err + } + + t.RecordMessage("block delay: %v", build.BlockDelay) + t.D().Gauge("miner.block-delay").Update(build.BlockDelay) + + ctx := context.Background() + + clients := t.IntParam("clients") + miners := t.IntParam("miners") + + myActorAddr, err := miner.minerApi.ActorAddress(ctx) + if err != nil { + return err + } + + // mine / stop mining + mine := true + done := make(chan struct{}) + + if miner.MineOne != nil { + go func() { + defer t.RecordMessage("shutting down mining") + defer close(done) + + var i int + for i = 0; mine; i++ { + // synchronize all miners to mine the next block + t.RecordMessage("synchronizing all miners to mine next block [%d]", i) + stateMineNext := sync.State(fmt.Sprintf("mine-block-%d", i)) + t.SyncClient.MustSignalAndWait(ctx, stateMineNext, miners) + + ch := make(chan struct{}) + err := miner.MineOne(ctx, func(mined bool) { + if mined { + t.D().Counter(fmt.Sprintf("block.mine,miner=%s", myActorAddr)).Inc(1) + } + close(ch) + }) + if err != nil { + panic(err) + } + <-ch + } + + // signal the last block to make sure no miners are left stuck waiting for the next block signal + // while the others have stopped + stateMineLast := sync.State(fmt.Sprintf("mine-block-%d", i)) + t.SyncClient.MustSignalEntry(ctx, stateMineLast) + }() + } else { + close(done) + } + + // wait for a signal from all clients to stop mining + err = <-t.SyncClient.MustBarrier(ctx, stateStopMining, clients).C + if err != nil { + return err + } + + mine = false + <-done + + t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) + return nil +} + +func prepareMiner(t *TestEnvironment) (*Node, error) { + ctx, cancel := context.WithTimeout(context.Background(), PrepareNodeTimeout) + defer cancel() + + pubsubTracer, err := getPubsubTracerMaddr(ctx, t) + if err != nil { + return nil, err + } + + drandOpt, err := getDrandOpts(ctx, t) + if err != nil { + return nil, err + } + + // first create a wallet + walletKey, err := wallet.GenerateKey(crypto.SigTypeBLS) + if err != nil { + return nil, err + } + + // publish the account ID/balance + balance := t.IntParam("balance") + balanceMsg := &InitialBalanceMsg{Addr: walletKey.Address, Balance: balance} + t.SyncClient.Publish(ctx, balanceTopic, balanceMsg) + + // create and publish the preseal commitment + priv, _, err := libp2p_crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + return nil, err + } + + minerID, err := peer.IDFromPrivateKey(priv) + if err != nil { + return nil, err + } + + // pick unique sequence number for each miner, no matter in which group they are + seq := t.SyncClient.MustSignalAndWait(ctx, stateMinerPickSeqNum, t.IntParam("miners")) + + minerAddr, err := address.NewIDAddress(genesis_chain.MinerStart + uint64(seq-1)) + if err != nil { + return nil, err + } + + presealDir, err := ioutil.TempDir("", "preseal") + if err != nil { + return nil, err + } + + sectors := t.IntParam("sectors") + genMiner, _, err := seed.PreSeal(minerAddr, abi.RegisteredSealProof_StackedDrg2KiBV1, 0, sectors, presealDir, []byte("TODO: randomize this"), &walletKey.KeyInfo) + if err != nil { + return nil, err + } + genMiner.PeerId = minerID + + t.RecordMessage("Miner Info: Owner: %s Worker: %s", genMiner.Owner, genMiner.Worker) + + presealMsg := &PresealMsg{Miner: *genMiner, Seqno: seq} + t.SyncClient.Publish(ctx, presealTopic, presealMsg) + + // then collect the genesis block and bootstrapper address + genesisMsg, err := waitForGenesis(t, ctx) + if err != nil { + return nil, err + } + + // prepare the repo + minerRepo := repo.NewMemory(nil) + + lr, err := minerRepo.Lock(repo.StorageMiner) + if err != nil { + return nil, err + } + + ks, err := lr.KeyStore() + if err != nil { + return nil, err + } + + kbytes, err := priv.Bytes() + if err != nil { + return nil, err + } + + err = ks.Put("libp2p-host", types.KeyInfo{ + Type: "libp2p-host", + PrivateKey: kbytes, + }) + if err != nil { + return nil, err + } + + ds, err := lr.Datastore("/metadata") + if err != nil { + return nil, err + } + + err = ds.Put(datastore.NewKey("miner-address"), minerAddr.Bytes()) + if err != nil { + return nil, err + } + + nic := storedcounter.New(ds, datastore.NewKey(modules.StorageCounterDSPrefix)) + for i := 0; i < (sectors + 1); i++ { + _, err = nic.Next() + if err != nil { + return nil, err + } + } + + err = lr.Close() + if err != nil { + return nil, err + } + + minerIP := t.NetClient.MustGetDataNetworkIP().String() + + // create the node + // we need both a full node _and_ and storage miner node + n := &Node{} + + nodeRepo := repo.NewMemory(nil) + + stop1, err := node.New(context.Background(), + node.FullAPI(&n.fullApi), + node.Online(), + node.Repo(nodeRepo), + withGenesis(genesisMsg.Genesis), + withListenAddress(minerIP), + withBootstrapper(genesisMsg.Bootstrapper), + withPubsubConfig(false, pubsubTracer), + drandOpt, + ) + if err != nil { + return nil, err + } + + // set the wallet + err = n.setWallet(ctx, walletKey) + if err != nil { + stop1(context.TODO()) + return nil, err + } + + minerOpts := []node.Option{ + node.StorageMiner(&n.minerApi), + node.Online(), + node.Repo(minerRepo), + node.Override(new(api.FullNode), n.fullApi), + withApiEndpoint("/ip4/127.0.0.1/tcp/1234"), + withMinerListenAddress(minerIP), + } + + if t.StringParam("mining_mode") != "natural" { + mineBlock := make(chan func(bool)) + minerOpts = append(minerOpts, + node.Override(new(*miner.Miner), miner.NewTestMiner(mineBlock, minerAddr))) + n.MineOne = func(ctx context.Context, cb func(bool)) error { + select { + case mineBlock <- cb: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } + } + + stop2, err := node.New(context.Background(), minerOpts...) + if err != nil { + stop1(context.TODO()) + return nil, err + } + n.stop = func(ctx context.Context) error { + // TODO use a multierror for this + err2 := stop2(ctx) + err1 := stop1(ctx) + if err2 != nil { + return err2 + } + return err1 + } + + registerAndExportMetrics(minerAddr.String()) + + // Bootstrap with full node + remoteAddrs, err := n.fullApi.NetAddrsListen(ctx) + if err != nil { + panic(err) + } + + err = n.minerApi.NetConnect(ctx, remoteAddrs) + if err != nil { + panic(err) + } + + err = startStorMinerAPIServer(minerRepo, n.minerApi) + if err != nil { + return nil, err + } + + // add local storage for presealed sectors + err = n.minerApi.StorageAddLocal(ctx, presealDir) + if err != nil { + n.stop(context.TODO()) + return nil, err + } + + // set the miner PeerID + minerIDEncoded, err := actors.SerializeParams(&saminer.ChangePeerIDParams{NewID: abi.PeerID(minerID)}) + if err != nil { + return nil, err + } + + changeMinerID := &types.Message{ + To: minerAddr, + From: genMiner.Worker, + Method: builtin.MethodsMiner.ChangePeerID, + Params: minerIDEncoded, + Value: types.NewInt(0), + GasPrice: types.NewInt(0), + GasLimit: 1000000, + } + + _, err = n.fullApi.MpoolPushMessage(ctx, changeMinerID) + if err != nil { + n.stop(context.TODO()) + return nil, err + } + + t.RecordMessage("publish our address to the miners addr topic") + actoraddress, err := n.minerApi.ActorAddress(ctx) + if err != nil { + return nil, err + } + addrinfo, err := n.minerApi.NetAddrsListen(ctx) + if err != nil { + return nil, err + } + t.SyncClient.MustPublish(ctx, minersAddrsTopic, MinerAddressesMsg{addrinfo, actoraddress}) + + t.RecordMessage("waiting for all nodes to be ready") + t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) + + return n, err +} + +func startStorMinerAPIServer(repo *repo.MemRepo, minerApi api.StorageMiner) error { + mux := mux.NewRouter() + + rpcServer := jsonrpc.NewServer() + rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(minerApi)) + + mux.Handle("/rpc/v0", rpcServer) + mux.PathPrefix("/remote").HandlerFunc(minerApi.(*impl.StorageMinerAPI).ServeRemote) + mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof + + ah := &auth.Handler{ + Verify: minerApi.AuthVerify, + Next: mux.ServeHTTP, + } + + srv := &http.Server{Handler: ah} + + return startServer(repo, srv) +} diff --git a/lotus-soup/tracer.go b/lotus-soup/role_pubsub_tracer.go similarity index 89% rename from lotus-soup/tracer.go rename to lotus-soup/role_pubsub_tracer.go index 17a287615..e91e94847 100644 --- a/lotus-soup/tracer.go +++ b/lotus-soup/role_pubsub_tracer.go @@ -5,8 +5,6 @@ import ( "crypto/rand" "fmt" - "github.com/testground/sdk-go/sync" - "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -15,19 +13,11 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -var ( - pubsubTracerTopic = sync.NewTopic("pubsubTracer", &PubsubTracerMsg{}) -) - type PubsubTracer struct { host host.Host traced *traced.TraceCollector } -type PubsubTracerMsg struct { - Tracer string -} - func (tr *PubsubTracer) Stop() error { tr.traced.Stop() return tr.host.Close() @@ -63,7 +53,7 @@ func preparePubsubTracer(t *TestEnvironment) (*PubsubTracer, error) { t.RecordMessage("I am %s", tracedMultiaddrStr) _ = ma.StringCast(tracedMultiaddrStr) - tracedMsg := &PubsubTracerMsg{Tracer: tracedMultiaddrStr} + tracedMsg := &PubsubTracerMsg{Multiaddr: tracedMultiaddrStr} t.SyncClient.MustPublish(ctx, pubsubTracerTopic, tracedMsg) t.RecordMessage("waiting for all nodes to be ready") diff --git a/lotus-soup/roles.go b/lotus-soup/roles.go new file mode 100644 index 000000000..a5d77db5a --- /dev/null +++ b/lotus-soup/roles.go @@ -0,0 +1,27 @@ +package main + +// This is the baseline test; Filecoin 101. +// +// A network with a bootstrapper, a number of miners, and a number of clients/full nodes +// is constructed and connected through the bootstrapper. +// Some funds are allocated to each node and a number of sectors are presealed in the genesis block. +// +// The test plan: +// One or more clients store content to one or more miners, testing storage deals. +// The plan ensures that the storage deals hit the blockchain and measure the time it took. +// Verification: one or more clients retrieve and verify the hashes of stored content. +// The plan ensures that all (previously) published content can be correctly retrieved +// and measures the time it took. +// +// Preparation of the genesis block: this is the responsibility of the bootstrapper. +// In order to compute the genesis block, we need to collect identities and presealed +// sectors from each node. +// Then we create a genesis block that allocates some funds to each node and collects +// the presealed sectors. +var basicRoles = map[string]func(*TestEnvironment) error{ + "bootstrapper": runBootstrapper, + "miner": runMiner, + "client": runBaselineClient, + "drand": runDrandNode, + "pubsub-tracer": runPubsubTracer, +} diff --git a/lotus-soup/sync.go b/lotus-soup/sync.go new file mode 100644 index 000000000..099d31283 --- /dev/null +++ b/lotus-soup/sync.go @@ -0,0 +1,55 @@ +package main + +import ( + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/genesis" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/testground/sdk-go/sync" +) + +var ( + genesisTopic = sync.NewTopic("genesis", &GenesisMsg{}) + balanceTopic = sync.NewTopic("balance", &InitialBalanceMsg{}) + presealTopic = sync.NewTopic("preseal", &PresealMsg{}) + clientsAddrsTopic = sync.NewTopic("clientsAddrsTopic", &peer.AddrInfo{}) + minersAddrsTopic = sync.NewTopic("minersAddrsTopic", &MinerAddressesMsg{}) + pubsubTracerTopic = sync.NewTopic("pubsubTracer", &PubsubTracerMsg{}) + drandConfigTopic = sync.NewTopic("drand-config", &DrandRuntimeInfo{}) +) + +var ( + stateReady = sync.State("ready") + stateDone = sync.State("done") + stateStopMining = sync.State("stop-mining") + stateMinerPickSeqNum = sync.State("miner-pick-seq-num") +) + +type InitialBalanceMsg struct { + Addr address.Address + Balance int +} + +type PresealMsg struct { + Miner genesis.Miner + Seqno int64 +} + +type GenesisMsg struct { + Genesis []byte + Bootstrapper []byte +} + +type MinerAddressesMsg struct { + PeerAddr peer.AddrInfo + ActorAddr address.Address +} + +type PubsubTracerMsg struct { + Multiaddr string +} + +type DrandRuntimeInfo struct { + Config dtypes.DrandConfig + GossipBootstrap dtypes.DrandBootstrap +}