// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. package storage import ( "bytes" "context" "crypto/rand" "errors" "fmt" "io/ioutil" "os" "sync" "testing" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage/localstore" ) var sourcePeerID = enode.HexID("99d8594b52298567d2ca3f4c441a5ba0140ee9245e26460d01102a52773c73b9") type mockNetFetcher struct { peers *sync.Map sources []*enode.ID peersPerRequest [][]Address requestCalled bool offerCalled bool quit <-chan struct{} ctx context.Context hopCounts []uint8 mu sync.Mutex } func (m *mockNetFetcher) Offer(source *enode.ID) { m.offerCalled = true m.sources = append(m.sources, source) } func (m *mockNetFetcher) Request(hopCount uint8) { m.mu.Lock() defer m.mu.Unlock() m.requestCalled = true var peers []Address m.peers.Range(func(key interface{}, _ interface{}) bool { peers = append(peers, common.FromHex(key.(string))) return true }) m.peersPerRequest = append(m.peersPerRequest, peers) m.hopCounts = append(m.hopCounts, hopCount) } type mockNetFetchFuncFactory struct { fetcher *mockNetFetcher } func (m *mockNetFetchFuncFactory) newMockNetFetcher(ctx context.Context, _ Address, peers *sync.Map) NetFetcher { m.fetcher.peers = peers m.fetcher.quit = ctx.Done() m.fetcher.ctx = ctx return m.fetcher } func newTestNetStore(t *testing.T) (netStore *NetStore, fetcher *mockNetFetcher, cleanup func()) { t.Helper() dir, err := ioutil.TempDir("", "swarm-storage-") if err != nil { t.Fatal(err) } localStore, err := localstore.New(dir, make([]byte, 32), nil) if err != nil { os.RemoveAll(dir) t.Fatal(err) } cleanup = func() { localStore.Close() os.RemoveAll(dir) } fetcher = new(mockNetFetcher) mockNetFetchFuncFactory := &mockNetFetchFuncFactory{ fetcher: fetcher, } netStore, err = NewNetStore(localStore, mockNetFetchFuncFactory.newMockNetFetcher) if err != nil { cleanup() t.Fatal(err) } return netStore, fetcher, cleanup } // TestNetStoreGetAndPut tests calling NetStore.Get which is blocked until the same chunk is Put. // After the Put there should no active fetchers, and the context created for the fetcher should // be cancelled. func TestNetStoreGetAndPut(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() ch := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() c := make(chan struct{}) // this channel ensures that the gouroutine with the Put does not run earlier than the Get putErrC := make(chan error) go func() { <-c // wait for the Get to be called time.Sleep(200 * time.Millisecond) // and a little more so it is surely called // check if netStore created a fetcher in the Get call for the unavailable chunk if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { putErrC <- errors.New("Expected netStore to use a fetcher for the Get call") return } _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { putErrC <- fmt.Errorf("Expected no err got %v", err) return } putErrC <- nil }() close(c) recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) // this is blocked until the Put above is done if err != nil { t.Fatalf("Expected no err got %v", err) } if err := <-putErrC; err != nil { t.Fatal(err) } // the retrieved chunk should be the same as what we Put if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) { t.Fatalf("Different chunk received than what was put") } // the chunk is already available locally, so there should be no active fetchers waiting for it if netStore.fetchers.Len() != 0 { t.Fatal("Expected netStore to remove the fetcher after delivery") } // A fetcher was created when the Get was called (and the chunk was not available). The chunk // was delivered with the Put call, so the fetcher should be cancelled now. select { case <-fetcher.ctx.Done(): default: t.Fatal("Expected fetcher context to be cancelled") } } // TestNetStoreGetAndPut tests calling NetStore.Put and then NetStore.Get. // After the Put the chunk is available locally, so the Get can just retrieve it from LocalStore, // there is no need to create fetchers. func TestNetStoreGetAfterPut(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() ch := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() // First we Put the chunk, so the chunk will be available locally _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } // Get should retrieve the chunk from LocalStore, without creating fetcher recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) if err != nil { t.Fatalf("Expected no err got %v", err) } // the retrieved chunk should be the same as what we Put if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) { t.Fatalf("Different chunk received than what was put") } // no fetcher offer or request should be created for a locally available chunk if fetcher.offerCalled || fetcher.requestCalled { t.Fatal("NetFetcher.offerCalled or requestCalled not expected to be called") } // no fetchers should be created for a locally available chunk if netStore.fetchers.Len() != 0 { t.Fatal("Expected netStore to not have fetcher") } } // TestNetStoreGetTimeout tests a Get call for an unavailable chunk and waits for timeout func TestNetStoreGetTimeout(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() ch := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() c := make(chan struct{}) // this channel ensures that the gouroutine does not run earlier than the Get fetcherErrC := make(chan error) go func() { <-c // wait for the Get to be called time.Sleep(200 * time.Millisecond) // and a little more so it is surely called // check if netStore created a fetcher in the Get call for the unavailable chunk if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call") return } fetcherErrC <- nil }() close(c) // We call Get on this chunk, which is not in LocalStore. We don't Put it at all, so there will // be a timeout _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) // Check if the timeout happened if err != context.DeadlineExceeded { t.Fatalf("Expected context.DeadLineExceeded err got %v", err) } if err := <-fetcherErrC; err != nil { t.Fatal(err) } // A fetcher was created, check if it has been removed after timeout if netStore.fetchers.Len() != 0 { t.Fatal("Expected netStore to remove the fetcher after timeout") } // Check if the fetcher context has been cancelled after the timeout select { case <-fetcher.ctx.Done(): default: t.Fatal("Expected fetcher context to be cancelled") } } // TestNetStoreGetCancel tests a Get call for an unavailable chunk, then cancels the context and checks // the errors func TestNetStoreGetCancel(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() ch := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) c := make(chan struct{}) // this channel ensures that the gouroutine with the cancel does not run earlier than the Get fetcherErrC := make(chan error, 1) go func() { <-c // wait for the Get to be called time.Sleep(200 * time.Millisecond) // and a little more so it is surely called // check if netStore created a fetcher in the Get call for the unavailable chunk if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call") return } fetcherErrC <- nil cancel() }() close(c) // We call Get with an unavailable chunk, so it will create a fetcher and wait for delivery _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) if err := <-fetcherErrC; err != nil { t.Fatal(err) } // After the context is cancelled above Get should return with an error if err != context.Canceled { t.Fatalf("Expected context.Canceled err got %v", err) } // A fetcher was created, check if it has been removed after cancel if netStore.fetchers.Len() != 0 { t.Fatal("Expected netStore to remove the fetcher after cancel") } // Check if the fetcher context has been cancelled after the request context cancel select { case <-fetcher.ctx.Done(): default: t.Fatal("Expected fetcher context to be cancelled") } } // TestNetStoreMultipleGetAndPut tests four Get calls for the same unavailable chunk. The chunk is // delivered with a Put, we have to make sure all Get calls return, and they use a single fetcher // for the chunk retrieval func TestNetStoreMultipleGetAndPut(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() ch := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() putErrC := make(chan error) go func() { // sleep to make sure Put is called after all the Get time.Sleep(500 * time.Millisecond) // check if netStore created exactly one fetcher for all Get calls if netStore.fetchers.Len() != 1 { putErrC <- errors.New("Expected netStore to use one fetcher for all Get calls") return } _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { putErrC <- fmt.Errorf("Expected no err got %v", err) return } putErrC <- nil }() count := 4 // call Get 4 times for the same unavailable chunk. The calls will be blocked until the Put above. errC := make(chan error) for i := 0; i < count; i++ { go func() { recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) if err != nil { errC <- fmt.Errorf("Expected no err got %v", err) } if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) { errC <- errors.New("Different chunk received than what was put") } errC <- nil }() } if err := <-putErrC; err != nil { t.Fatal(err) } timeout := time.After(1 * time.Second) // The Get calls should return after Put, so no timeout expected for i := 0; i < count; i++ { select { case err := <-errC: if err != nil { t.Fatal(err) } case <-timeout: t.Fatalf("Timeout waiting for Get calls to return") } } // A fetcher was created, check if it has been removed after cancel if netStore.fetchers.Len() != 0 { t.Fatal("Expected netStore to remove the fetcher after delivery") } // A fetcher was created, check if it has been removed after delivery select { case <-fetcher.ctx.Done(): default: t.Fatal("Expected fetcher context to be cancelled") } } // TestNetStoreFetchFuncTimeout tests a FetchFunc call for an unavailable chunk and waits for timeout func TestNetStoreFetchFuncTimeout(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() // FetchFunc is called for an unavaible chunk, so the returned wait function should not be nil wait := netStore.FetchFunc(ctx, chunk.Address()) if wait == nil { t.Fatal("Expected wait function to be not nil") } // There should an active fetcher for the chunk after the FetchFunc call if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { t.Fatalf("Expected netStore to have one fetcher for the requested chunk") } // wait function should timeout because we don't deliver the chunk with a Put err := wait(ctx) if err != context.DeadlineExceeded { t.Fatalf("Expected context.DeadLineExceeded err got %v", err) } // the fetcher should be removed after timeout if netStore.fetchers.Len() != 0 { t.Fatal("Expected netStore to remove the fetcher after timeout") } // the fetcher context should be cancelled after timeout select { case <-fetcher.ctx.Done(): default: t.Fatal("Expected fetcher context to be cancelled") } } // TestNetStoreFetchFuncAfterPut tests that the FetchFunc should return nil for a locally available chunk func TestNetStoreFetchFuncAfterPut(t *testing.T) { netStore, _, cleanup := newTestNetStore(t) defer cleanup() ch := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() // We deliver the created the chunk with a Put _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } // FetchFunc should return nil, because the chunk is available locally, no need to fetch it wait := netStore.FetchFunc(ctx, ch.Address()) if wait != nil { t.Fatal("Expected wait to be nil") } // No fetchers should be created at all if netStore.fetchers.Len() != 0 { t.Fatal("Expected netStore to not have fetcher") } } // TestNetStoreGetCallsRequest tests if Get created a request on the NetFetcher for an unavailable chunk func TestNetStoreGetCallsRequest(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() ch := GenerateRandomChunk(chunk.DefaultSize) ctx := context.WithValue(context.Background(), "hopcount", uint8(5)) ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) defer cancel() // We call get for a not available chunk, it will timeout because the chunk is not delivered _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) if err != context.DeadlineExceeded { t.Fatalf("Expected context.DeadlineExceeded err got %v", err) } // NetStore should call NetFetcher.Request and wait for the chunk if !fetcher.requestCalled { t.Fatal("Expected NetFetcher.Request to be called") } if fetcher.hopCounts[0] != 5 { t.Fatalf("Expected NetFetcher.Request be called with hopCount 5, got %v", fetcher.hopCounts[0]) } } // TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk // in case of a source peer provided in the context. func TestNetStoreGetCallsOffer(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() ch := GenerateRandomChunk(chunk.DefaultSize) // If a source peer is added to the context, NetStore will handle it as an offer ctx := context.WithValue(context.Background(), "source", sourcePeerID.String()) ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) defer cancel() // We call get for a not available chunk, it will timeout because the chunk is not delivered _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) if err != context.DeadlineExceeded { t.Fatalf("Expect error %v got %v", context.DeadlineExceeded, err) } // NetStore should call NetFetcher.Offer with the source peer if !fetcher.offerCalled { t.Fatal("Expected NetFetcher.Request to be called") } if len(fetcher.sources) != 1 { t.Fatalf("Expected fetcher sources length 1 got %v", len(fetcher.sources)) } if fetcher.sources[0].String() != sourcePeerID.String() { t.Fatalf("Expected fetcher source %v got %v", sourcePeerID, fetcher.sources[0]) } } // TestNetStoreFetcherCountPeers tests multiple NetStore.Get calls with peer in the context. // There is no Put call, so the Get calls timeout func TestNetStoreFetcherCountPeers(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() addr := randomAddr() peers := []string{randomAddr().Hex(), randomAddr().Hex(), randomAddr().Hex()} ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() errC := make(chan error) nrGets := 3 // Call Get 3 times with a peer in context for i := 0; i < nrGets; i++ { peer := peers[i] go func() { ctx := context.WithValue(ctx, "peer", peer) _, err := netStore.Get(ctx, chunk.ModeGetRequest, addr) errC <- err }() } // All 3 Get calls should timeout for i := 0; i < nrGets; i++ { err := <-errC if err != context.DeadlineExceeded { t.Fatalf("Expected \"%v\" error got \"%v\"", context.DeadlineExceeded, err) } } // fetcher should be closed after timeout select { case <-fetcher.quit: case <-time.After(3 * time.Second): t.Fatalf("mockNetFetcher not closed after timeout") } // All 3 peers should be given to NetFetcher after the 3 Get calls if len(fetcher.peersPerRequest) != nrGets { t.Fatalf("Expected 3 got %v", len(fetcher.peersPerRequest)) } for i, peers := range fetcher.peersPerRequest { if len(peers) < i+1 { t.Fatalf("Expected at least %v got %v", i+1, len(peers)) } } } // TestNetStoreFetchFuncCalledMultipleTimes calls the wait function given by FetchFunc three times, // and checks there is still exactly one fetcher for one chunk. Afthe chunk is delivered, it checks // if the fetcher is closed. func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() ch := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() // FetchFunc should return a non-nil wait function, because the chunk is not available wait := netStore.FetchFunc(ctx, ch.Address()) if wait == nil { t.Fatal("Expected wait function to be not nil") } // There should be exactly one fetcher for the chunk if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { t.Fatalf("Expected netStore to have one fetcher for the requested chunk") } // Call wait three times in parallel count := 3 errC := make(chan error) for i := 0; i < count; i++ { go func() { errC <- wait(ctx) }() } // sleep a little so the wait functions are called above time.Sleep(100 * time.Millisecond) // there should be still only one fetcher, because all wait calls are for the same chunk if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { t.Fatal("Expected netStore to have one fetcher for the requested chunk") } // Deliver the chunk with a Put _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } // wait until all wait calls return (because the chunk is delivered) for i := 0; i < count; i++ { err := <-errC if err != nil { t.Fatal(err) } } // There should be no more fetchers for the delivered chunk if netStore.fetchers.Len() != 0 { t.Fatal("Expected netStore to remove the fetcher after delivery") } // The context for the fetcher should be cancelled after delivery select { case <-fetcher.ctx.Done(): default: t.Fatal("Expected fetcher context to be cancelled") } } // TestNetStoreFetcherLifeCycleWithTimeout is similar to TestNetStoreFetchFuncCalledMultipleTimes, // the only difference is that we don't deilver the chunk, just wait for timeout func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) { netStore, fetcher, cleanup := newTestNetStore(t) defer cleanup() chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() // FetchFunc should return a non-nil wait function, because the chunk is not available wait := netStore.FetchFunc(ctx, chunk.Address()) if wait == nil { t.Fatal("Expected wait function to be not nil") } // There should be exactly one fetcher for the chunk if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { t.Fatalf("Expected netStore to have one fetcher for the requested chunk") } // Call wait three times in parallel count := 3 errC := make(chan error) for i := 0; i < count; i++ { go func() { rctx, rcancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer rcancel() err := wait(rctx) if err != context.DeadlineExceeded { errC <- fmt.Errorf("Expected err %v got %v", context.DeadlineExceeded, err) return } errC <- nil }() } // wait until all wait calls timeout for i := 0; i < count; i++ { err := <-errC if err != nil { t.Fatal(err) } } // There should be no more fetchers after timeout if netStore.fetchers.Len() != 0 { t.Fatal("Expected netStore to remove the fetcher after delivery") } // The context for the fetcher should be cancelled after timeout select { case <-fetcher.ctx.Done(): default: t.Fatal("Expected fetcher context to be cancelled") } } func randomAddr() Address { addr := make([]byte, 32) rand.Read(addr) return Address(addr) }