swarm/network: refactor simulation tests bootstrap (#18975)

This commit is contained in:
Anton Evangelatov 2019-02-01 09:58:46 +01:00 committed by Viktor Trón
parent a89170cfb2
commit 597597e8b2
9 changed files with 155 additions and 246 deletions

View File

@ -26,16 +26,19 @@ import (
"math/rand" "math/rand"
"os" "os"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing" p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil" "github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable" colorable "github.com/mattn/go-colorable"
) )
@ -66,6 +69,80 @@ func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
} }
// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, nil, err
}
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
return addr, netStore, delivery, cleanup, nil
}
// newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, err
}
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
return netStore, delivery, cleanup, nil
}
// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, nil, nil, err
}
netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New
return addr, netStore, delivery, cleanup, nil
}
func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
n := ctx.Config.Node()
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if *useMockStore {
store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
}
if err != nil {
return nil, nil, nil, err
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, nil, err
}
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
bucket.Store(bucketKeyStore, store)
bucket.Store(bucketKeyDB, netStore)
bucket.Store(bucketKeyDelivery, delivery)
bucket.Store(bucketKeyFileStore, fileStore)
cleanup := func() {
netStore.Close()
os.RemoveAll(datadir)
}
return netStore, delivery, cleanup, nil
}
func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup // setup
addr := network.RandomAddr() // tested peers peer address addr := network.RandomAddr() // tested peers peer address

View File

@ -21,7 +21,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -457,26 +456,10 @@ func TestDeliveryFromNodes(t *testing.T) {
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) { func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{ sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
node := ctx.Config.Node() addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
addr := network.NewAddr(node)
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck, SkipCheck: skipCheck,
@ -485,11 +468,12 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
}, nil) }, nil)
bucket.Store(bucketKeyRegistry, r) bucket.Store(bucketKeyRegistry, r)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) cleanup = func() {
bucket.Store(bucketKeyFileStore, fileStore) r.Close()
clean()
}
return r, cleanup, nil return r, cleanup, nil
}, },
}) })
defer sim.Close() defer sim.Close()
@ -644,25 +628,10 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) { func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{ sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
node := ctx.Config.Node() addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
addr := network.NewAddr(node)
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck, SkipCheck: skipCheck,
@ -670,12 +639,14 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
Retrieval: RetrievalDisabled, Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0, SyncUpdateDelay: 0,
}, nil) }, nil)
bucket.Store(bucketKeyRegistry, r)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) cleanup = func() {
bucket.Store(bucketKeyFileStore, fileStore) r.Close()
clean()
}
return r, cleanup, nil return r, cleanup, nil
}, },
}) })
defer sim.Close() defer sim.Close()

View File

@ -21,7 +21,6 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -31,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
@ -62,26 +60,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
externalStreamMaxKeys := uint64(100) externalStreamMaxKeys := uint64(100)
sim := simulation.New(map[string]simulation.ServiceFunc{ sim := simulation.New(map[string]simulation.ServiceFunc{
"intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
n := ctx.Config.Node() addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
bucket.Store(bucketKeyStore, store)
cleanup = func() {
store.Close()
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled, Retrieval: RetrievalDisabled,
@ -97,11 +80,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
}) })
fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams()) cleanup := func() {
bucket.Store(bucketKeyFileStore, fileStore) r.Close()
clean()
}
return r, cleanup, nil return r, cleanup, nil
}, },
}) })
defer sim.Close() defer sim.Close()

View File

@ -18,7 +18,6 @@ package stream
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -27,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
@ -105,26 +103,11 @@ func TestRetrieval(t *testing.T) {
} }
var retrievalSimServiceMap = map[string]simulation.ServiceFunc{ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
"streamer": retrievalStreamerFunc, "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
} addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
bucket.Store(bucketKeyStore, store)
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled, Retrieval: RetrievalEnabled,
@ -132,16 +115,13 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no
SyncUpdateDelay: 3 * time.Second, SyncUpdateDelay: 3 * time.Second,
}, nil) }, nil)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup = func() { cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close() r.Close()
clean()
} }
return r, cleanup, nil return r, cleanup, nil
},
} }
/* /*

View File

@ -107,25 +107,11 @@ func TestSyncingViaGlobalSync(t *testing.T) {
} }
var simServiceMap = map[string]simulation.ServiceFunc{ var simServiceMap = map[string]simulation.ServiceFunc{
"streamer": streamerFunc, "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
} addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node()
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
bucket.Store(bucketKeyStore, store)
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled, Retrieval: RetrievalDisabled,
@ -136,13 +122,12 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic
bucket.Store(bucketKeyRegistry, r) bucket.Store(bucketKeyRegistry, r)
cleanup = func() { cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close() r.Close()
clean()
} }
return r, cleanup, nil return r, cleanup, nil
},
} }
func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {

View File

@ -21,7 +21,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -37,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
) )
@ -1209,26 +1207,18 @@ func TestGetSubscriptionsRPC(t *testing.T) {
// create a standard sim // create a standard sim
sim := simulation.New(map[string]simulation.ServiceFunc{ sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node() addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
// configure so that sync registrations actually happen // configure so that sync registrations actually happen
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalEnabled, Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe, //enable sync registrations Syncing: SyncingAutoSubscribe, //enable sync registrations
SyncUpdateDelay: syncUpdateDelay, SyncUpdateDelay: syncUpdateDelay,
}, nil) }, nil)
// get the SubscribeMsg code // get the SubscribeMsg code
subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{}) subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
if !ok { if !ok {
@ -1236,13 +1226,11 @@ func TestGetSubscriptionsRPC(t *testing.T) {
} }
cleanup = func() { cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close() r.Close()
clean()
} }
return r, cleanup, nil return r, cleanup, nil
}, },
}) })
defer sim.Close() defer sim.Close()
@ -1322,9 +1310,9 @@ func TestGetSubscriptionsRPC(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
//length of the subscriptions can not be smaller than number of peers //length of the subscriptions can not be smaller than number of peers
log.Debug("node subscriptions:", "node", node.String()) log.Debug("node subscriptions", "node", node.String())
for p, ps := range pstreams { for p, ps := range pstreams {
log.Debug("... with: ", "peer", p) log.Debug("... with", "peer", p)
for _, s := range ps { for _, s := range ps {
log.Debug(".......", "stream", s) log.Debug(".......", "stream", s)
// each node also has subscriptions to RETRIEVE_REQUEST streams, // each node also has subscriptions to RETRIEVE_REQUEST streams,

View File

@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math" "math"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -38,7 +37,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/ethereum/go-ethereum/swarm/storage/mock"
mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil" "github.com/ethereum/go-ethereum/swarm/testutil"
) )
@ -73,38 +71,14 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
sim := simulation.New(map[string]simulation.ServiceFunc{ sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
var store storage.ChunkStore addr := network.NewAddr(ctx.Config.Node())
var datadir string
node := ctx.Config.Node()
addr := network.NewAddr(node)
//hack to put addresses in same space //hack to put addresses in same space
addr.OAddr[0] = byte(0) addr.OAddr[0] = byte(0)
if *useMockStore { netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr)
store, datadir, err = createMockStore(mockmem.NewGlobalStore(), node.ID(), addr)
} else {
store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
}
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
bucket.Store(bucketKeyStore, store)
cleanup = func() {
store.Close()
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyDB, netStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled, Retrieval: RetrievalDisabled,
@ -112,11 +86,12 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
SkipCheck: skipCheck, SkipCheck: skipCheck,
}, nil) }, nil)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) cleanup = func() {
bucket.Store(bucketKeyFileStore, fileStore) r.Close()
clean()
}
return r, cleanup, nil return r, cleanup, nil
}, },
}) })
defer sim.Close() defer sim.Close()
@ -251,44 +226,26 @@ func TestSameVersionID(t *testing.T) {
v := uint(1) v := uint(1)
sim := simulation.New(map[string]simulation.ServiceFunc{ sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
var store storage.ChunkStore addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
var datadir string
node := ctx.Config.Node()
addr := network.NewAddr(node)
store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
bucket.Store(bucketKeyStore, store)
cleanup = func() {
store.Close()
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyDB, netStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled, Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe, Syncing: SyncingAutoSubscribe,
}, nil) }, nil)
bucket.Store(bucketKeyRegistry, r)
//assign to each node the same version ID //assign to each node the same version ID
r.spec.Version = v r.spec.Version = v
bucket.Store(bucketKeyRegistry, r) cleanup = func() {
r.Close()
clean()
}
return r, cleanup, nil return r, cleanup, nil
}, },
}) })
defer sim.Close() defer sim.Close()
@ -333,46 +290,27 @@ func TestDifferentVersionID(t *testing.T) {
v := uint(0) v := uint(0)
sim := simulation.New(map[string]simulation.ServiceFunc{ sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
var store storage.ChunkStore addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
var datadir string
node := ctx.Config.Node()
addr := network.NewAddr(node)
store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
bucket.Store(bucketKeyStore, store)
cleanup = func() {
store.Close()
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyDB, netStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled, Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe, Syncing: SyncingAutoSubscribe,
}, nil) }, nil)
bucket.Store(bucketKeyRegistry, r)
//increase the version ID for each node //increase the version ID for each node
v++ v++
r.spec.Version = v r.spec.Version = v
bucket.Store(bucketKeyRegistry, r) cleanup = func() {
r.Close()
clean()
}
return r, cleanup, nil return r, cleanup, nil
}, },
}) })
defer sim.Close() defer sim.Close()

View File

@ -24,7 +24,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"os"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -37,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
@ -169,21 +167,10 @@ func TestSnapshotSyncWithServer(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{ sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
n := ctx.Config.Node() addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers)
addr := network.NewAddr(n)
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
bucket.Store(bucketKeyStore, store)
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled, Retrieval: RetrievalDisabled,
@ -199,9 +186,8 @@ func TestSnapshotSyncWithServer(t *testing.T) {
bucket.Store(bucketKeyRegistry, tr) bucket.Store(bucketKeyRegistry, tr)
cleanup = func() { cleanup = func() {
netStore.Close()
tr.Close() tr.Close()
os.RemoveAll(datadir) clean()
} }
return tr, cleanup, nil return tr, cleanup, nil