Merge pull request #63 from filecoin-project/basic-deal-flow-cleanup

Basic deal flow: some clean up and multi client/miner test readiness
This commit is contained in:
vyzo 2020-06-25 14:50:30 +03:00 committed by GitHub
commit 6f298b929e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 55 deletions

View File

@ -22,7 +22,6 @@ import (
dstest "github.com/ipfs/go-merkledag/test" dstest "github.com/ipfs/go-merkledag/test"
unixfile "github.com/ipfs/go-unixfs/file" unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-car" "github.com/ipld/go-car"
"github.com/libp2p/go-libp2p-core/peer"
) )
// This is the basline test; Filecoin 101. // This is the basline test; Filecoin 101.
@ -69,12 +68,9 @@ func runBaselineMiner(t *TestEnvironment) error {
} }
ctx := context.Background() ctx := context.Background()
addrs, err := collectClientsAddrs(t, ctx, t.IntParam("clients"))
if err != nil {
return err
}
t.RecordMessage("got %v client addrs", len(addrs)) clients := t.IntParam("clients")
miners := t.IntParam("miners")
// mine / stop mining // mine / stop mining
mine := true mine := true
@ -82,20 +78,25 @@ func runBaselineMiner(t *TestEnvironment) error {
go func() { go func() {
defer close(done) defer close(done)
for mine { for mine {
time.Sleep(100 * time.Millisecond)
//t.RecordMessage("mine one block") // synchronize all miners to mine the next block
t.RecordMessage("synchronizing all miners to mine next block")
t.SyncClient.MustSignalAndWait(ctx, stateMineNext, miners)
time.Sleep(time.Duration(rand.Intn(int(100 * time.Millisecond))))
// wait and synchronise // wait and synchronise
if err := miner.MineOne(ctx, func(bool) { err := miner.MineOne(ctx, func(bool) {
// after a block is mined // after a block is mined
}); err != nil { })
if err != nil {
panic(err) panic(err)
} }
} }
}() }()
// wait for a signa to stop mining // wait for a signal from all clients to stop mining
err = <-t.SyncClient.MustBarrier(ctx, stateStopMining, 1).C err = <-t.SyncClient.MustBarrier(ctx, stateStopMining, clients).C
if err != nil { if err != nil {
return err return err
} }
@ -116,7 +117,7 @@ func runBaselineClient(t *TestEnvironment) error {
} }
ctx := context.Background() ctx := context.Background()
addrs, err := collectMinersAddrs(t, ctx, t.IntParam("miners")) addrs, err := collectMinerAddrs(t, ctx, t.IntParam("miners"))
if err != nil { if err != nil {
return err return err
} }
@ -125,11 +126,13 @@ func runBaselineClient(t *TestEnvironment) error {
t.RecordMessage("got %v miner addrs", len(addrs)) t.RecordMessage("got %v miner addrs", len(addrs))
if err := client.NetConnect(ctx, addrs[0].PeerAddr); err != nil { // select a random miner
minerAddr := addrs[rand.Intn(len(addrs))]
if err := client.NetConnect(ctx, minerAddr.PeerAddr); err != nil {
return err return err
} }
t.RecordMessage("client connected to miner") t.RecordMessage("selected %s as the miner", minerAddr.ActorAddr)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
@ -144,19 +147,19 @@ func runBaselineClient(t *TestEnvironment) error {
t.RecordMessage("file cid: %s", fcid) t.RecordMessage("file cid: %s", fcid)
// start deal // start deal
deal := startDeal(ctx, addrs[0].ActorAddr, client, fcid) deal := startDeal(ctx, minerAddr.ActorAddr, client, fcid)
t.RecordMessage("started deal: %s", deal) t.RecordMessage("started deal: %s", deal)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
t.RecordMessage("wait to be sealed") t.RecordMessage("waiting for deal to be sealed")
waitDealSealed(t, ctx, client, deal) waitDealSealed(t, ctx, client, deal)
carExport := true carExport := true
t.RecordMessage("try to retrieve fcid") t.RecordMessage("trying to retrieve %s", fcid)
retrieve(t, ctx, err, client, fcid, carExport, data) retrieveData(t, ctx, err, client, fcid, carExport, data)
t.SyncClient.MustSignalEntry(ctx, stateStopMining) t.SyncClient.MustSignalEntry(ctx, stateStopMining)
@ -167,40 +170,6 @@ func runBaselineClient(t *TestEnvironment) error {
return nil return nil
} }
func collectMinersAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddresses, error) {
ch := make(chan MinerAddresses)
sub := t.SyncClient.MustSubscribe(ctx, minersAddrsTopic, ch)
addrs := make([]MinerAddresses, 0, miners)
for i := 0; i < miners; i++ {
select {
case a := <-ch:
addrs = append(addrs, a)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for miners addrs: %w", err)
}
}
return addrs, nil
}
func collectClientsAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]peer.AddrInfo, error) {
ch := make(chan peer.AddrInfo)
sub := t.SyncClient.MustSubscribe(ctx, clientsAddrsTopic, ch)
addrs := make([]peer.AddrInfo, 0, clients)
for i := 0; i < clients; i++ {
select {
case a := <-ch:
addrs = append(addrs, a)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for clients addrs: %w", err)
}
}
return addrs, nil
}
func startDeal(ctx context.Context, minerActorAddr address.Address, client *impl.FullNodeAPI, fcid cid.Cid) *cid.Cid { func startDeal(ctx context.Context, minerActorAddr address.Address, client *impl.FullNodeAPI, fcid cid.Cid) *cid.Cid {
addr, err := client.WalletDefaultAddress(ctx) addr, err := client.WalletDefaultAddress(ctx)
if err != nil { if err != nil {
@ -243,7 +212,7 @@ loop:
} }
} }
func retrieve(t *TestEnvironment, ctx context.Context, err error, client *impl.FullNodeAPI, fcid cid.Cid, carExport bool, data []byte) { func retrieveData(t *TestEnvironment, ctx context.Context, err error, client *impl.FullNodeAPI, fcid cid.Cid, carExport bool, data []byte) {
offers, err := client.ClientFindData(ctx, fcid) offers, err := client.ClientFindData(ctx, fcid)
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -47,7 +47,7 @@ import (
) )
func init() { func init() {
logging.SetLogLevel("*", "ERROR") logging.SetLogLevel("*", "WARN")
os.Setenv("BELLMAN_NO_GPU", "1") os.Setenv("BELLMAN_NO_GPU", "1")
@ -73,6 +73,7 @@ var (
stateReady = sync.State("ready") stateReady = sync.State("ready")
stateDone = sync.State("done") stateDone = sync.State("done")
stateMineNext = sync.State("mine-next")
stateStopMining = sync.State("stop-mining") stateStopMining = sync.State("stop-mining")
) )
@ -601,3 +602,37 @@ func waitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error
return nil, fmt.Errorf("error while waiting for genesis msg: %w", err) return nil, fmt.Errorf("error while waiting for genesis msg: %w", err)
} }
} }
func collectMinerAddrs(t *TestEnvironment, ctx context.Context, miners int) ([]MinerAddresses, error) {
ch := make(chan MinerAddresses)
sub := t.SyncClient.MustSubscribe(ctx, minersAddrsTopic, ch)
addrs := make([]MinerAddresses, 0, miners)
for i := 0; i < miners; i++ {
select {
case a := <-ch:
addrs = append(addrs, a)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for miners addrs: %w", err)
}
}
return addrs, nil
}
func collectClientAddrs(t *TestEnvironment, ctx context.Context, clients int) ([]peer.AddrInfo, error) {
ch := make(chan peer.AddrInfo)
sub := t.SyncClient.MustSubscribe(ctx, clientsAddrsTopic, ch)
addrs := make([]peer.AddrInfo, 0, clients)
for i := 0; i < clients; i++ {
select {
case a := <-ch:
addrs = append(addrs, a)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for clients addrs: %w", err)
}
}
return addrs, nil
}