swarm: Fix T.Fatal inside a goroutine in tests (#18409)
* swarm/storage: fix T.Fatal inside a goroutine * swarm/network/simulation: fix T.Fatal inside a goroutine * swarm/network/stream: fix T.Fatal inside a goroutine * swarm/network/simulation: consistent failures in TestPeerEventsTimeout * swarm/network/simulation: rename sendRunSignal to triggerSimulationRun
This commit is contained in:
		
							parent
							
								
									81f04fa606
								
							
						
					
					
						commit
						d70c4faf20
					
				| @ -81,6 +81,7 @@ func TestPeerEventsTimeout(t *testing.T) { | ||||
| 	events := sim.PeerEvents(ctx, sim.NodeIDs()) | ||||
| 
 | ||||
| 	done := make(chan struct{}) | ||||
| 	errC := make(chan error) | ||||
| 	go func() { | ||||
| 		for e := range events { | ||||
| 			if e.Error == context.Canceled { | ||||
| @ -90,14 +91,16 @@ func TestPeerEventsTimeout(t *testing.T) { | ||||
| 				close(done) | ||||
| 				return | ||||
| 			} else { | ||||
| 				t.Fatal(e.Error) | ||||
| 				errC <- e.Error | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	select { | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Error("no context deadline received") | ||||
| 		t.Fatal("no context deadline received") | ||||
| 	case err := <-errC: | ||||
| 		t.Fatal(err) | ||||
| 	case <-done: | ||||
| 		// all good, context deadline detected
 | ||||
| 	} | ||||
|  | ||||
| @ -73,7 +73,8 @@ func TestSimulationWithHTTPServer(t *testing.T) { | ||||
| 	//this time the timeout should be long enough so that it doesn't kick in too early
 | ||||
| 	ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) | ||||
| 	defer cancel2() | ||||
| 	go sendRunSignal(t) | ||||
| 	errC := make(chan error, 1) | ||||
| 	go triggerSimulationRun(t, errC) | ||||
| 	result = sim.Run(ctx, func(ctx context.Context, sim *Simulation) error { | ||||
| 		log.Debug("This run waits for the run signal from `frontend`...") | ||||
| 		//ensure with a Sleep that simulation doesn't terminate before the signal is received
 | ||||
| @ -83,10 +84,13 @@ func TestSimulationWithHTTPServer(t *testing.T) { | ||||
| 	if result.Error != nil { | ||||
| 		t.Fatal(result.Error) | ||||
| 	} | ||||
| 	if err := <-errC; err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	log.Debug("Test terminated successfully") | ||||
| } | ||||
| 
 | ||||
| func sendRunSignal(t *testing.T) { | ||||
| func triggerSimulationRun(t *testing.T, errC chan error) { | ||||
| 	//We need to first wait for the sim HTTP server to start running...
 | ||||
| 	time.Sleep(2 * time.Second) | ||||
| 	//then we can send the signal
 | ||||
| @ -94,16 +98,13 @@ func sendRunSignal(t *testing.T) { | ||||
| 	log.Debug("Sending run signal to simulation: POST /runsim...") | ||||
| 	resp, err := http.Post(fmt.Sprintf("http://localhost%s/runsim", DefaultHTTPSimAddr), "application/json", nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Request failed: %v", err) | ||||
| 		errC <- fmt.Errorf("Request failed: %v", err) | ||||
| 		return | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		err := resp.Body.Close() | ||||
| 		if err != nil { | ||||
| 			log.Error("Error closing response body", "err", err) | ||||
| 		} | ||||
| 	}() | ||||
| 	log.Debug("Signal sent") | ||||
| 	if resp.StatusCode != http.StatusOK { | ||||
| 		t.Fatalf("err %s", resp.Status) | ||||
| 		errC <- fmt.Errorf("err %s", resp.Status) | ||||
| 		return | ||||
| 	} | ||||
| 	errC <- resp.Body.Close() | ||||
| } | ||||
|  | ||||
| @ -19,9 +19,11 @@ package stream | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| @ -500,7 +502,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) | ||||
| 
 | ||||
| 	log.Info("Starting simulation") | ||||
| 	ctx := context.Background() | ||||
| 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { | ||||
| 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { | ||||
| 		nodeIDs := sim.UpNodeIDs() | ||||
| 		//determine the pivot node to be the first node of the simulation
 | ||||
| 		pivot := nodeIDs[0] | ||||
| @ -553,14 +555,13 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) | ||||
| 		} | ||||
| 		pivotFileStore := item.(*storage.FileStore) | ||||
| 		log.Debug("Starting retrieval routine") | ||||
| 		retErrC := make(chan error) | ||||
| 		go func() { | ||||
| 			// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
 | ||||
| 			// we must wait for the peer connections to have started before requesting
 | ||||
| 			n, err := readAll(pivotFileStore, fileHash) | ||||
| 			log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("requesting chunks action error: %v", err) | ||||
| 			} | ||||
| 			retErrC <- err | ||||
| 		}() | ||||
| 
 | ||||
| 		log.Debug("Watching for disconnections") | ||||
| @ -570,11 +571,19 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) | ||||
| 			simulation.NewPeerEventsFilter().Drop(), | ||||
| 		) | ||||
| 
 | ||||
| 		var disconnected atomic.Value | ||||
| 		go func() { | ||||
| 			for d := range disconnections { | ||||
| 				if d.Error != nil { | ||||
| 					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) | ||||
| 					t.Fatal(d.Error) | ||||
| 					disconnected.Store(true) | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 		defer func() { | ||||
| 			if err != nil { | ||||
| 				if yes, ok := disconnected.Load().(bool); ok && yes { | ||||
| 					err = errors.New("disconnect events received") | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| @ -595,6 +604,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) | ||||
| 		if !success { | ||||
| 			return fmt.Errorf("Test failed, chunks not available on all nodes") | ||||
| 		} | ||||
| 		if err := <-retErrC; err != nil { | ||||
| 			t.Fatalf("requesting chunks: %v", err) | ||||
| 		} | ||||
| 		log.Debug("Test terminated successfully") | ||||
| 		return nil | ||||
| 	}) | ||||
| @ -675,7 +687,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b | ||||
| 	} | ||||
| 
 | ||||
| 	ctx := context.Background() | ||||
| 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { | ||||
| 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { | ||||
| 		nodeIDs := sim.UpNodeIDs() | ||||
| 		node := nodeIDs[len(nodeIDs)-1] | ||||
| 
 | ||||
| @ -702,11 +714,19 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b | ||||
| 			simulation.NewPeerEventsFilter().Drop(), | ||||
| 		) | ||||
| 
 | ||||
| 		var disconnected atomic.Value | ||||
| 		go func() { | ||||
| 			for d := range disconnections { | ||||
| 				if d.Error != nil { | ||||
| 					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) | ||||
| 					b.Fatal(d.Error) | ||||
| 					disconnected.Store(true) | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 		defer func() { | ||||
| 			if err != nil { | ||||
| 				if yes, ok := disconnected.Load().(bool); ok && yes { | ||||
| 					err = errors.New("disconnect events received") | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| @ -19,9 +19,11 @@ package stream | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/binary" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| @ -117,7 +119,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { | ||||
| 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { | ||||
| 		nodeIDs := sim.UpNodeIDs() | ||||
| 		storer := nodeIDs[0] | ||||
| 		checker := nodeIDs[1] | ||||
| @ -162,11 +164,19 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		var disconnected atomic.Value | ||||
| 		go func() { | ||||
| 			for d := range disconnections { | ||||
| 				if d.Error != nil { | ||||
| 					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) | ||||
| 					t.Fatal(d.Error) | ||||
| 					disconnected.Store(true) | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 		defer func() { | ||||
| 			if err != nil { | ||||
| 				if yes, ok := disconnected.Load().(bool); ok && yes { | ||||
| 					err = errors.New("disconnect events received") | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| @ -21,6 +21,7 @@ import ( | ||||
| 	"os" | ||||
| 	"runtime" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| @ -213,11 +214,13 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { | ||||
| 		simulation.NewPeerEventsFilter().Drop(), | ||||
| 	) | ||||
| 
 | ||||
| 	var disconnected atomic.Value | ||||
| 	go func() { | ||||
| 		for d := range disconnections { | ||||
| 			log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) | ||||
| 			t.Fatal("unexpected disconnect") | ||||
| 			cancelSimRun() | ||||
| 			if d.Error != nil { | ||||
| 				log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) | ||||
| 				disconnected.Store(true) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| @ -226,6 +229,9 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { | ||||
| 	if result.Error != nil { | ||||
| 		t.Fatal(result.Error) | ||||
| 	} | ||||
| 	if yes, ok := disconnected.Load().(bool); ok && yes { | ||||
| 		t.Fatal("disconnect events received") | ||||
| 	} | ||||
| 	log.Info("Simulation ended") | ||||
| } | ||||
| 
 | ||||
| @ -395,11 +401,13 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) | ||||
| 		simulation.NewPeerEventsFilter().Drop(), | ||||
| 	) | ||||
| 
 | ||||
| 	var disconnected atomic.Value | ||||
| 	go func() { | ||||
| 		for d := range disconnections { | ||||
| 			log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) | ||||
| 			t.Fatal("unexpected disconnect") | ||||
| 			cancelSimRun() | ||||
| 			if d.Error != nil { | ||||
| 				log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) | ||||
| 				disconnected.Store(true) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| @ -514,6 +522,9 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) | ||||
| 		return result.Error | ||||
| 	} | ||||
| 
 | ||||
| 	if yes, ok := disconnected.Load().(bool); ok && yes { | ||||
| 		t.Fatal("disconnect events received") | ||||
| 	} | ||||
| 	log.Info("Simulation ended") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -18,11 +18,13 @@ package stream | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"math" | ||||
| 	"os" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| @ -129,7 +131,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { | ||||
| 	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { | ||||
| 		nodeIDs := sim.UpNodeIDs() | ||||
| 
 | ||||
| 		nodeIndex := make(map[enode.ID]int) | ||||
| @ -143,11 +145,19 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p | ||||
| 			simulation.NewPeerEventsFilter().Drop(), | ||||
| 		) | ||||
| 
 | ||||
| 		var disconnected atomic.Value | ||||
| 		go func() { | ||||
| 			for d := range disconnections { | ||||
| 				if d.Error != nil { | ||||
| 					log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) | ||||
| 					t.Fatal(d.Error) | ||||
| 					disconnected.Store(true) | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 		defer func() { | ||||
| 			if err != nil { | ||||
| 				if yes, ok := disconnected.Load().(bool); ok && yes { | ||||
| 					err = errors.New("disconnect events received") | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| @ -20,6 +20,8 @@ import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"crypto/rand" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| @ -114,19 +116,24 @@ func TestNetStoreGetAndPut(t *testing.T) { | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	c := make(chan struct{}) // this channel ensures that the gouroutine with the Put does not run earlier than the Get
 | ||||
| 	putErrC := make(chan error) | ||||
| 	go func() { | ||||
| 		<-c                                // wait for the Get to be called
 | ||||
| 		time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
 | ||||
| 
 | ||||
| 		// check if netStore created a fetcher in the Get call for the unavailable chunk
 | ||||
| 		if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { | ||||
| 			t.Fatal("Expected netStore to use a fetcher for the Get call") | ||||
| 			putErrC <- errors.New("Expected netStore to use a fetcher for the Get call") | ||||
| 			return | ||||
| 		} | ||||
| 
 | ||||
| 		err := netStore.Put(ctx, chunk) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Expected no err got %v", err) | ||||
| 			putErrC <- fmt.Errorf("Expected no err got %v", err) | ||||
| 			return | ||||
| 		} | ||||
| 
 | ||||
| 		putErrC <- nil | ||||
| 	}() | ||||
| 
 | ||||
| 	close(c) | ||||
| @ -134,6 +141,10 @@ func TestNetStoreGetAndPut(t *testing.T) { | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Expected no err got %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := <-putErrC; err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	// the retrieved chunk should be the same as what we Put
 | ||||
| 	if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { | ||||
| 		t.Fatalf("Different chunk received than what was put") | ||||
| @ -200,14 +211,18 @@ func TestNetStoreGetTimeout(t *testing.T) { | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	c := make(chan struct{}) // this channel ensures that the gouroutine does not run earlier than the Get
 | ||||
| 	fetcherErrC := make(chan error) | ||||
| 	go func() { | ||||
| 		<-c                                // wait for the Get to be called
 | ||||
| 		time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
 | ||||
| 
 | ||||
| 		// check if netStore created a fetcher in the Get call for the unavailable chunk
 | ||||
| 		if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { | ||||
| 			t.Fatal("Expected netStore to use a fetcher for the Get call") | ||||
| 			fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call") | ||||
| 			return | ||||
| 		} | ||||
| 
 | ||||
| 		fetcherErrC <- nil | ||||
| 	}() | ||||
| 
 | ||||
| 	close(c) | ||||
| @ -220,6 +235,10 @@ func TestNetStoreGetTimeout(t *testing.T) { | ||||
| 		t.Fatalf("Expected context.DeadLineExceeded err got %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := <-fetcherErrC; err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	// A fetcher was created, check if it has been removed after timeout
 | ||||
| 	if netStore.fetchers.Len() != 0 { | ||||
| 		t.Fatal("Expected netStore to remove the fetcher after timeout") | ||||
| @ -243,20 +262,29 @@ func TestNetStoreGetCancel(t *testing.T) { | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | ||||
| 
 | ||||
| 	c := make(chan struct{}) // this channel ensures that the gouroutine with the cancel does not run earlier than the Get
 | ||||
| 	fetcherErrC := make(chan error, 1) | ||||
| 	go func() { | ||||
| 		<-c                                // wait for the Get to be called
 | ||||
| 		time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
 | ||||
| 		// check if netStore created a fetcher in the Get call for the unavailable chunk
 | ||||
| 		if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { | ||||
| 			t.Fatal("Expected netStore to use a fetcher for the Get call") | ||||
| 			fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call") | ||||
| 			return | ||||
| 		} | ||||
| 
 | ||||
| 		fetcherErrC <- nil | ||||
| 		cancel() | ||||
| 	}() | ||||
| 
 | ||||
| 	close(c) | ||||
| 
 | ||||
| 	// We call Get with an unavailable chunk, so it will create a fetcher and wait for delivery
 | ||||
| 	_, err := netStore.Get(ctx, chunk.Address()) | ||||
| 
 | ||||
| 	if err := <-fetcherErrC; err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	// After the context is cancelled above Get should return with an error
 | ||||
| 	if err != context.Canceled { | ||||
| 		t.Fatalf("Expected context.Canceled err got %v", err) | ||||
| @ -286,46 +314,55 @@ func TestNetStoreMultipleGetAndPut(t *testing.T) { | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	putErrC := make(chan error) | ||||
| 	go func() { | ||||
| 		// sleep to make sure Put is called after all the Get
 | ||||
| 		time.Sleep(500 * time.Millisecond) | ||||
| 		// check if netStore created exactly one fetcher for all Get calls
 | ||||
| 		if netStore.fetchers.Len() != 1 { | ||||
| 			t.Fatal("Expected netStore to use one fetcher for all Get calls") | ||||
| 			putErrC <- errors.New("Expected netStore to use one fetcher for all Get calls") | ||||
| 			return | ||||
| 		} | ||||
| 		err := netStore.Put(ctx, chunk) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Expected no err got %v", err) | ||||
| 			putErrC <- fmt.Errorf("Expected no err got %v", err) | ||||
| 			return | ||||
| 		} | ||||
| 		putErrC <- nil | ||||
| 	}() | ||||
| 
 | ||||
| 	count := 4 | ||||
| 	// call Get 4 times for the same unavailable chunk. The calls will be blocked until the Put above.
 | ||||
| 	getWG := sync.WaitGroup{} | ||||
| 	for i := 0; i < 4; i++ { | ||||
| 		getWG.Add(1) | ||||
| 	errC := make(chan error) | ||||
| 	for i := 0; i < count; i++ { | ||||
| 		go func() { | ||||
| 			defer getWG.Done() | ||||
| 			recChunk, err := netStore.Get(ctx, chunk.Address()) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("Expected no err got %v", err) | ||||
| 				errC <- fmt.Errorf("Expected no err got %v", err) | ||||
| 			} | ||||
| 			if !bytes.Equal(recChunk.Address(), chunk.Address()) || !bytes.Equal(recChunk.Data(), chunk.Data()) { | ||||
| 				t.Fatalf("Different chunk received than what was put") | ||||
| 				errC <- errors.New("Different chunk received than what was put") | ||||
| 			} | ||||
| 			errC <- nil | ||||
| 		}() | ||||
| 	} | ||||
| 
 | ||||
| 	finishedC := make(chan struct{}) | ||||
| 	go func() { | ||||
| 		getWG.Wait() | ||||
| 		close(finishedC) | ||||
| 	}() | ||||
| 	if err := <-putErrC; err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	timeout := time.After(1 * time.Second) | ||||
| 
 | ||||
| 	// The Get calls should return after Put, so no timeout expected
 | ||||
| 	select { | ||||
| 	case <-finishedC: | ||||
| 	case <-time.After(1 * time.Second): | ||||
| 		t.Fatalf("Timeout waiting for Get calls to return") | ||||
| 	for i := 0; i < count; i++ { | ||||
| 		select { | ||||
| 		case err := <-errC: | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 		case <-timeout: | ||||
| 			t.Fatalf("Timeout waiting for Get calls to return") | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// A fetcher was created, check if it has been removed after cancel
 | ||||
| @ -448,7 +485,7 @@ func TestNetStoreGetCallsOffer(t *testing.T) { | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	// We call get for a not available chunk, it will timeout because the chunk is not delivered
 | ||||
| 	chunk, err := netStore.Get(ctx, chunk.Address()) | ||||
| 	_, err := netStore.Get(ctx, chunk.Address()) | ||||
| 
 | ||||
| 	if err != context.DeadlineExceeded { | ||||
| 		t.Fatalf("Expect error %v got %v", context.DeadlineExceeded, err) | ||||
| @ -542,16 +579,12 @@ func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { | ||||
| 		t.Fatalf("Expected netStore to have one fetcher for the requested chunk") | ||||
| 	} | ||||
| 
 | ||||
| 	// Call wait three times parallelly
 | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	for i := 0; i < 3; i++ { | ||||
| 		wg.Add(1) | ||||
| 	// Call wait three times in parallel
 | ||||
| 	count := 3 | ||||
| 	errC := make(chan error) | ||||
| 	for i := 0; i < count; i++ { | ||||
| 		go func() { | ||||
| 			err := wait(ctx) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("Expected no err got %v", err) | ||||
| 			} | ||||
| 			wg.Done() | ||||
| 			errC <- wait(ctx) | ||||
| 		}() | ||||
| 	} | ||||
| 
 | ||||
| @ -570,7 +603,12 @@ func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { | ||||
| 	} | ||||
| 
 | ||||
| 	// wait until all wait calls return (because the chunk is delivered)
 | ||||
| 	wg.Wait() | ||||
| 	for i := 0; i < count; i++ { | ||||
| 		err := <-errC | ||||
| 		if err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// There should be no more fetchers for the delivered chunk
 | ||||
| 	if netStore.fetchers.Len() != 0 { | ||||
| @ -606,23 +644,29 @@ func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) { | ||||
| 		t.Fatalf("Expected netStore to have one fetcher for the requested chunk") | ||||
| 	} | ||||
| 
 | ||||
| 	// Call wait three times parallelly
 | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	for i := 0; i < 3; i++ { | ||||
| 		wg.Add(1) | ||||
| 	// Call wait three times in parallel
 | ||||
| 	count := 3 | ||||
| 	errC := make(chan error) | ||||
| 	for i := 0; i < count; i++ { | ||||
| 		go func() { | ||||
| 			defer wg.Done() | ||||
| 			rctx, rcancel := context.WithTimeout(context.Background(), 100*time.Millisecond) | ||||
| 			defer rcancel() | ||||
| 			err := wait(rctx) | ||||
| 			if err != context.DeadlineExceeded { | ||||
| 				t.Fatalf("Expected err %v got %v", context.DeadlineExceeded, err) | ||||
| 				errC <- fmt.Errorf("Expected err %v got %v", context.DeadlineExceeded, err) | ||||
| 				return | ||||
| 			} | ||||
| 			errC <- nil | ||||
| 		}() | ||||
| 	} | ||||
| 
 | ||||
| 	// wait until all wait calls timeout
 | ||||
| 	wg.Wait() | ||||
| 	for i := 0; i < count; i++ { | ||||
| 		err := <-errC | ||||
| 		if err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// There should be no more fetchers after timeout
 | ||||
| 	if netStore.fetchers.Len() != 0 { | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user