From ebd0e85d6586de7299ed08a6afc3b34302ef2a5f Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 24 Jun 2020 13:10:35 +0200 Subject: [PATCH] extract subscriptions --- lotus-soup/node.go | 84 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 22 deletions(-) diff --git a/lotus-soup/node.go b/lotus-soup/node.go index ee17e981a..375d25682 100644 --- a/lotus-soup/node.go +++ b/lotus-soup/node.go @@ -107,30 +107,22 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) { // the first duty of the boostrapper is to construct the genesis block // first collect all client and miner balances to assign initial funds - balanceMsgs := make([]*InitialBalanceMsg, 0, nodes) - balanceCh := make(chan *InitialBalanceMsg) - - t.SyncClient.MustSubscribe(ctx, balanceTopic, balanceCh) - for i := 0; i < nodes; i++ { - m := <-balanceCh - balanceMsgs = append(balanceMsgs, m) + balances, err := waitForBalances(t, ctx, nodes) + if err != nil { + return nil, err } // then collect all preseals from miners - presealMsgs := make([]*PresealMsg, 0, miners) - presealCh := make(chan *PresealMsg) - - t.SyncClient.MustSubscribe(ctx, presealTopic, presealCh) - for i := 0; i < miners; i++ { - m := <-presealCh - presealMsgs = append(presealMsgs, m) + preseals, err := collectPreseals(t, ctx, miners) + if err != nil { + return nil, err } // now construct the genesis block var genesisActors []genesis.Actor var genesisMiners []genesis.Miner - for _, bm := range balanceMsgs { + for _, bm := range balances { genesisActors = append(genesisActors, genesis.Actor{ Type: genesis.TAccount, @@ -139,7 +131,7 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) { }) } - for _, pm := range presealMsgs { + for _, pm := range preseals { genesisMiners = append(genesisMiners, pm.Miner) } @@ -272,9 +264,10 @@ func prepareMiner(t *TestEnvironment) (*Node, error) { t.SyncClient.Publish(ctx, presealTopic, presealMsg) // then collect the genesis block and bootstrapper address - genesisCh := make(chan *GenesisMsg) - t.SyncClient.MustSubscribe(ctx, genesisTopic, genesisCh) - genesisMsg := <-genesisCh + genesisMsg, err := waitForGenesis(t, ctx) + if err != nil { + return nil, err + } // prepare the repo minerRepo := repo.NewMemory(nil) @@ -425,9 +418,10 @@ func prepareClient(t *TestEnvironment) (*Node, error) { t.SyncClient.Publish(ctx, balanceTopic, balanceMsg) // then collect the genesis block and bootstrapper address - genesisCh := make(chan *GenesisMsg) - t.SyncClient.MustSubscribe(ctx, genesisTopic, genesisCh) - genesisMsg := <-genesisCh + genesisMsg, err := waitForGenesis(t, ctx) + if err != nil { + return nil, err + } clientIP := t.NetClient.MustGetDataNetworkIP().String() @@ -516,3 +510,49 @@ func withMinerListenAddress(ip string) node.Option { addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4002", ip)} return node.Override(node.StartListeningKey, lp2p.StartListening(addrs)) } + +func waitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) { + ch := make(chan *InitialBalanceMsg) + sub := t.SyncClient.MustSubscribe(ctx, balanceTopic, ch) + + balances := make([]*InitialBalanceMsg, 0, nodes) + for i := 0; i < nodes; i++ { + select { + case m := <-ch: + balances = append(balances, m) + case err := <-sub.Done(): + return nil, fmt.Errorf("got error while waiting for balances: %w", err) + } + } + + return balances, nil +} + +func collectPreseals(t *TestEnvironment, ctx context.Context, miners int) ([]*PresealMsg, error) { + ch := make(chan *PresealMsg) + sub := t.SyncClient.MustSubscribe(ctx, presealTopic, ch) + + preseals := make([]*PresealMsg, 0, miners) + for i := 0; i < miners; i++ { + select { + case m := <-ch: + preseals = append(preseals, m) + case err := <-sub.Done(): + return nil, fmt.Errorf("got error while waiting for preseals: %w", err) + } + } + + return preseals, nil +} + +func waitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error) { + genesisCh := make(chan *GenesisMsg) + sub := t.SyncClient.MustSubscribe(ctx, genesisTopic, genesisCh) + + select { + case genesisMsg := <-genesisCh: + return genesisMsg, nil + case err := <-sub.Done(): + return nil, fmt.Errorf("error while waiting for genesis msg: %w", err) + } +}