swarm/network/stream: Fix flaky tests in GetSubscriptionsRPC test (#19227)

* swarm/network/stream: fixed timing issues

* swarm/network/stream: only count first iteration of subscriptions

* swarm/network/stream/: fix linter errors
This commit is contained in:
holisticode 2019-03-07 03:24:28 -05:00 committed by Viktor Trón
parent 72b21db2d3
commit a87776a5fe

View File

@ -1207,7 +1207,12 @@ func TestGetSubscriptionsRPC(t *testing.T) {
// we use this subscriptionFunc for this test: just increases count and calls the actual subscription // we use this subscriptionFunc for this test: just increases count and calls the actual subscription
subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
expectedMsgCount.inc() // syncing starts after syncUpdateDelay and loops after that Duration; we only want to count at the first iteration
// in the first iteration, subs will be empty (no existing subscriptions), thus we can use this check
// this avoids flakyness
if len(subs) == 0 {
expectedMsgCount.inc()
}
doRequestSubscription(r, p, bin, subs) doRequestSubscription(r, p, bin, subs)
return true return true
} }
@ -1245,19 +1250,19 @@ func TestGetSubscriptionsRPC(t *testing.T) {
ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute) ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancelSimRun() defer cancelSimRun()
// setup the filter for SubscribeMsg
msgs := sim.PeerEvents(
context.Background(),
sim.UpNodeIDs(),
simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
)
// upload a snapshot // upload a snapshot
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// setup the filter for SubscribeMsg
msgs := sim.PeerEvents(
context.Background(),
sim.NodeIDs(),
simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
)
// strategy: listen to all SubscribeMsg events; after every event we wait // strategy: listen to all SubscribeMsg events; after every event we wait
// if after `waitDuration` no more messages are being received, we assume the // if after `waitDuration` no more messages are being received, we assume the
// subscription phase has terminated! // subscription phase has terminated!
@ -1267,9 +1272,9 @@ func TestGetSubscriptionsRPC(t *testing.T) {
// any new subscriptions any more // any new subscriptions any more
go func() { go func() {
//for long running sims, waiting 1 sec will not be enough //for long running sims, waiting 1 sec will not be enough
waitDuration := time.Duration(nodeCount/16) * time.Second waitDuration := 1 * time.Second
if *longrunning { if *longrunning {
waitDuration = syncUpdateDelay waitDuration = 3 * time.Second
} }
for { for {
select { select {
@ -1335,8 +1340,10 @@ func TestGetSubscriptionsRPC(t *testing.T) {
log.Debug("All node streams counted", "realCount", realCount) log.Debug("All node streams counted", "realCount", realCount)
} }
emc := expectedMsgCount.count() emc := expectedMsgCount.count()
if realCount != emc { // after a subscription request, internally a live AND a history stream will be subscribed,
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount, emc) // thus the real count should be half of the actual request subscriptions sent
if realCount/2 != emc {
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, emc)
} }
return nil return nil
}) })