extract subscriptions

This commit is contained in:
Anton Evangelatov 2020-06-24 13:10:35 +02:00
parent 448bbf3710
commit ebd0e85d65

View File

@ -107,30 +107,22 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
// the first duty of the boostrapper is to construct the genesis block // the first duty of the boostrapper is to construct the genesis block
// first collect all client and miner balances to assign initial funds // first collect all client and miner balances to assign initial funds
balanceMsgs := make([]*InitialBalanceMsg, 0, nodes) balances, err := waitForBalances(t, ctx, nodes)
balanceCh := make(chan *InitialBalanceMsg) if err != nil {
return nil, err
t.SyncClient.MustSubscribe(ctx, balanceTopic, balanceCh)
for i := 0; i < nodes; i++ {
m := <-balanceCh
balanceMsgs = append(balanceMsgs, m)
} }
// then collect all preseals from miners // then collect all preseals from miners
presealMsgs := make([]*PresealMsg, 0, miners) preseals, err := collectPreseals(t, ctx, miners)
presealCh := make(chan *PresealMsg) if err != nil {
return nil, err
t.SyncClient.MustSubscribe(ctx, presealTopic, presealCh)
for i := 0; i < miners; i++ {
m := <-presealCh
presealMsgs = append(presealMsgs, m)
} }
// now construct the genesis block // now construct the genesis block
var genesisActors []genesis.Actor var genesisActors []genesis.Actor
var genesisMiners []genesis.Miner var genesisMiners []genesis.Miner
for _, bm := range balanceMsgs { for _, bm := range balances {
genesisActors = append(genesisActors, genesisActors = append(genesisActors,
genesis.Actor{ genesis.Actor{
Type: genesis.TAccount, Type: genesis.TAccount,
@ -139,7 +131,7 @@ func prepareBootstrapper(t *TestEnvironment) (*Node, error) {
}) })
} }
for _, pm := range presealMsgs { for _, pm := range preseals {
genesisMiners = append(genesisMiners, pm.Miner) genesisMiners = append(genesisMiners, pm.Miner)
} }
@ -272,9 +264,10 @@ func prepareMiner(t *TestEnvironment) (*Node, error) {
t.SyncClient.Publish(ctx, presealTopic, presealMsg) t.SyncClient.Publish(ctx, presealTopic, presealMsg)
// then collect the genesis block and bootstrapper address // then collect the genesis block and bootstrapper address
genesisCh := make(chan *GenesisMsg) genesisMsg, err := waitForGenesis(t, ctx)
t.SyncClient.MustSubscribe(ctx, genesisTopic, genesisCh) if err != nil {
genesisMsg := <-genesisCh return nil, err
}
// prepare the repo // prepare the repo
minerRepo := repo.NewMemory(nil) minerRepo := repo.NewMemory(nil)
@ -425,9 +418,10 @@ func prepareClient(t *TestEnvironment) (*Node, error) {
t.SyncClient.Publish(ctx, balanceTopic, balanceMsg) t.SyncClient.Publish(ctx, balanceTopic, balanceMsg)
// then collect the genesis block and bootstrapper address // then collect the genesis block and bootstrapper address
genesisCh := make(chan *GenesisMsg) genesisMsg, err := waitForGenesis(t, ctx)
t.SyncClient.MustSubscribe(ctx, genesisTopic, genesisCh) if err != nil {
genesisMsg := <-genesisCh return nil, err
}
clientIP := t.NetClient.MustGetDataNetworkIP().String() clientIP := t.NetClient.MustGetDataNetworkIP().String()
@ -516,3 +510,49 @@ func withMinerListenAddress(ip string) node.Option {
addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4002", ip)} addrs := []string{fmt.Sprintf("/ip4/%s/tcp/4002", ip)}
return node.Override(node.StartListeningKey, lp2p.StartListening(addrs)) return node.Override(node.StartListeningKey, lp2p.StartListening(addrs))
} }
func waitForBalances(t *TestEnvironment, ctx context.Context, nodes int) ([]*InitialBalanceMsg, error) {
ch := make(chan *InitialBalanceMsg)
sub := t.SyncClient.MustSubscribe(ctx, balanceTopic, ch)
balances := make([]*InitialBalanceMsg, 0, nodes)
for i := 0; i < nodes; i++ {
select {
case m := <-ch:
balances = append(balances, m)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for balances: %w", err)
}
}
return balances, nil
}
func collectPreseals(t *TestEnvironment, ctx context.Context, miners int) ([]*PresealMsg, error) {
ch := make(chan *PresealMsg)
sub := t.SyncClient.MustSubscribe(ctx, presealTopic, ch)
preseals := make([]*PresealMsg, 0, miners)
for i := 0; i < miners; i++ {
select {
case m := <-ch:
preseals = append(preseals, m)
case err := <-sub.Done():
return nil, fmt.Errorf("got error while waiting for preseals: %w", err)
}
}
return preseals, nil
}
func waitForGenesis(t *TestEnvironment, ctx context.Context) (*GenesisMsg, error) {
genesisCh := make(chan *GenesisMsg)
sub := t.SyncClient.MustSubscribe(ctx, genesisTopic, genesisCh)
select {
case genesisMsg := <-genesisCh:
return genesisMsg, nil
case err := <-sub.Done():
return nil, fmt.Errorf("error while waiting for genesis msg: %w", err)
}
}