diff --git a/cmd/swarm/global-store/explorer.go b/cmd/swarm/global-store/explorer.go
new file mode 100644
index 000000000..634ff1ebb
--- /dev/null
+++ b/cmd/swarm/global-store/explorer.go
@@ -0,0 +1,66 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of go-ethereum.
+//
+// go-ethereum is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// go-ethereum is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with go-ethereum. If not, see .
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "net/http"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock/explorer"
+ cli "gopkg.in/urfave/cli.v1"
+)
+
+// serveChunkExplorer starts an http server in background with chunk explorer handler
+// using the provided global store. Server is started if the returned shutdown function
+// is not nil.
+func serveChunkExplorer(ctx *cli.Context, globalStore mock.GlobalStorer) (shutdown func(), err error) {
+ if !ctx.IsSet("explorer-address") {
+ return nil, nil
+ }
+
+ corsOrigins := ctx.StringSlice("explorer-cors-origin")
+ server := &http.Server{
+ Handler: explorer.NewHandler(globalStore, corsOrigins),
+ IdleTimeout: 30 * time.Minute,
+ ReadTimeout: 2 * time.Minute,
+ WriteTimeout: 2 * time.Minute,
+ }
+ listener, err := net.Listen("tcp", ctx.String("explorer-address"))
+ if err != nil {
+ return nil, fmt.Errorf("explorer: %v", err)
+ }
+ log.Info("chunk explorer http", "address", listener.Addr().String(), "origins", corsOrigins)
+
+ go func() {
+ if err := server.Serve(listener); err != nil {
+ log.Error("chunk explorer", "err", err)
+ }
+ }()
+
+ return func() {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := server.Shutdown(ctx); err != nil {
+ log.Error("chunk explorer: shutdown", "err", err)
+ }
+ }, nil
+}
diff --git a/cmd/swarm/global-store/explorer_test.go b/cmd/swarm/global-store/explorer_test.go
new file mode 100644
index 000000000..2e4928c8f
--- /dev/null
+++ b/cmd/swarm/global-store/explorer_test.go
@@ -0,0 +1,254 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of go-ethereum.
+//
+// go-ethereum is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// go-ethereum is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with go-ethereum. If not, see .
+
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "sort"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock/explorer"
+ mockRPC "github.com/ethereum/go-ethereum/swarm/storage/mock/rpc"
+)
+
+// TestExplorer validates basic chunk explorer functionality by storing
+// a small set of chunk and making http requests on exposed endpoint.
+// Full chunk explorer validation is done in mock/explorer package.
+func TestExplorer(t *testing.T) {
+ addr := findFreeTCPAddress(t)
+ explorerAddr := findFreeTCPAddress(t)
+ testCmd := runGlobalStore(t, "ws", "--addr", addr, "--explorer-address", explorerAddr)
+ defer testCmd.Kill()
+
+ client := websocketClient(t, addr)
+
+ store := mockRPC.NewGlobalStore(client)
+ defer store.Close()
+
+ nodeKeys := map[string][]string{
+ "a1": {"b1", "b2", "b3"},
+ "a2": {"b3", "b4", "b5"},
+ }
+
+ keyNodes := make(map[string][]string)
+
+ for addr, keys := range nodeKeys {
+ for _, key := range keys {
+ keyNodes[key] = append(keyNodes[key], addr)
+ }
+ }
+
+ invalidAddr := "c1"
+ invalidKey := "d1"
+
+ for addr, keys := range nodeKeys {
+ for _, key := range keys {
+ err := store.Put(common.HexToAddress(addr), common.Hex2Bytes(key), []byte("data"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ }
+
+ endpoint := "http://" + explorerAddr
+
+ t.Run("has key", func(t *testing.T) {
+ for addr, keys := range nodeKeys {
+ for _, key := range keys {
+ testStatusResponse(t, endpoint+"/api/has-key/"+addr+"/"+key, http.StatusOK)
+ testStatusResponse(t, endpoint+"/api/has-key/"+invalidAddr+"/"+key, http.StatusNotFound)
+ }
+ testStatusResponse(t, endpoint+"/api/has-key/"+addr+"/"+invalidKey, http.StatusNotFound)
+ }
+ testStatusResponse(t, endpoint+"/api/has-key/"+invalidAddr+"/"+invalidKey, http.StatusNotFound)
+ })
+
+ t.Run("keys", func(t *testing.T) {
+ var keys []string
+ for key := range keyNodes {
+ keys = append(keys, key)
+ }
+ sort.Strings(keys)
+ testKeysResponse(t, endpoint+"/api/keys", explorer.KeysResponse{
+ Keys: keys,
+ })
+ })
+
+ t.Run("nodes", func(t *testing.T) {
+ var nodes []string
+ for addr := range nodeKeys {
+ nodes = append(nodes, common.HexToAddress(addr).Hex())
+ }
+ sort.Strings(nodes)
+ testNodesResponse(t, endpoint+"/api/nodes", explorer.NodesResponse{
+ Nodes: nodes,
+ })
+ })
+
+ t.Run("node keys", func(t *testing.T) {
+ for addr, keys := range nodeKeys {
+ testKeysResponse(t, endpoint+"/api/keys?node="+addr, explorer.KeysResponse{
+ Keys: keys,
+ })
+ }
+ testKeysResponse(t, endpoint+"/api/keys?node="+invalidAddr, explorer.KeysResponse{})
+ })
+
+ t.Run("key nodes", func(t *testing.T) {
+ for key, addrs := range keyNodes {
+ var nodes []string
+ for _, addr := range addrs {
+ nodes = append(nodes, common.HexToAddress(addr).Hex())
+ }
+ sort.Strings(nodes)
+ testNodesResponse(t, endpoint+"/api/nodes?key="+key, explorer.NodesResponse{
+ Nodes: nodes,
+ })
+ }
+ testNodesResponse(t, endpoint+"/api/nodes?key="+invalidKey, explorer.NodesResponse{})
+ })
+}
+
+// TestExplorer_CORSOrigin validates if chunk explorer returns
+// correct CORS origin header in GET and OPTIONS requests.
+func TestExplorer_CORSOrigin(t *testing.T) {
+ origin := "http://localhost/"
+ addr := findFreeTCPAddress(t)
+ explorerAddr := findFreeTCPAddress(t)
+ testCmd := runGlobalStore(t, "ws",
+ "--addr", addr,
+ "--explorer-address", explorerAddr,
+ "--explorer-cors-origin", origin,
+ )
+ defer testCmd.Kill()
+
+ // wait until the server is started
+ waitHTTPEndpoint(t, explorerAddr)
+
+ url := "http://" + explorerAddr + "/api/keys"
+
+ t.Run("get", func(t *testing.T) {
+ req, err := http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ req.Header.Set("Origin", origin)
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ header := resp.Header.Get("Access-Control-Allow-Origin")
+ if header != origin {
+ t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, origin)
+ }
+ })
+
+ t.Run("preflight", func(t *testing.T) {
+ req, err := http.NewRequest(http.MethodOptions, url, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ req.Header.Set("Origin", origin)
+ req.Header.Set("Access-Control-Request-Method", "GET")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ header := resp.Header.Get("Access-Control-Allow-Origin")
+ if header != origin {
+ t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, origin)
+ }
+ })
+}
+
+// testStatusResponse makes an http request to provided url
+// and validates if response is explorer.StatusResponse for
+// the expected status code.
+func testStatusResponse(t *testing.T, url string, code int) {
+ t.Helper()
+
+ resp, err := http.Get(url)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if resp.StatusCode != code {
+ t.Errorf("got status code %v, want %v", resp.StatusCode, code)
+ }
+ var r explorer.StatusResponse
+ if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ t.Fatal(err)
+ }
+ if r.Code != code {
+ t.Errorf("got response code %v, want %v", r.Code, code)
+ }
+ if r.Message != http.StatusText(code) {
+ t.Errorf("got response message %q, want %q", r.Message, http.StatusText(code))
+ }
+}
+
+// testKeysResponse makes an http request to provided url
+// and validates if response machhes expected explorer.KeysResponse.
+func testKeysResponse(t *testing.T, url string, want explorer.KeysResponse) {
+ t.Helper()
+
+ resp, err := http.Get(url)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if resp.StatusCode != http.StatusOK {
+ t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK)
+ }
+ var r explorer.KeysResponse
+ if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ t.Fatal(err)
+ }
+ if fmt.Sprint(r.Keys) != fmt.Sprint(want.Keys) {
+ t.Errorf("got keys %v, want %v", r.Keys, want.Keys)
+ }
+ if r.Next != want.Next {
+ t.Errorf("got next %s, want %s", r.Next, want.Next)
+ }
+}
+
+// testNodeResponse makes an http request to provided url
+// and validates if response machhes expected explorer.NodeResponse.
+func testNodesResponse(t *testing.T, url string, want explorer.NodesResponse) {
+ t.Helper()
+
+ resp, err := http.Get(url)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if resp.StatusCode != http.StatusOK {
+ t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK)
+ }
+ var r explorer.NodesResponse
+ if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ t.Fatal(err)
+ }
+ if fmt.Sprint(r.Nodes) != fmt.Sprint(want.Nodes) {
+ t.Errorf("got nodes %v, want %v", r.Nodes, want.Nodes)
+ }
+ if r.Next != want.Next {
+ t.Errorf("got next %s, want %s", r.Next, want.Next)
+ }
+}
diff --git a/cmd/swarm/global-store/global_store.go b/cmd/swarm/global-store/global_store.go
index a55756e1c..f93b464db 100644
--- a/cmd/swarm/global-store/global_store.go
+++ b/cmd/swarm/global-store/global_store.go
@@ -17,6 +17,7 @@
package main
import (
+ "io"
"net"
"net/http"
"os"
@@ -66,7 +67,7 @@ func startWS(ctx *cli.Context) (err error) {
return http.Serve(listener, server.WebsocketHandler(origins))
}
-// newServer creates a global store and returns its RPC server.
+// newServer creates a global store and starts a chunk explorer server if configured.
// Returned cleanup function should be called only if err is nil.
func newServer(ctx *cli.Context) (server *rpc.Server, cleanup func(), err error) {
log.PrintOrigins(true)
@@ -81,7 +82,9 @@ func newServer(ctx *cli.Context) (server *rpc.Server, cleanup func(), err error)
return nil, nil, err
}
cleanup = func() {
- dbStore.Close()
+ if err := dbStore.Close(); err != nil {
+ log.Error("global store: close", "err", err)
+ }
}
globalStore = dbStore
log.Info("database global store", "dir", dir)
@@ -96,5 +99,22 @@ func newServer(ctx *cli.Context) (server *rpc.Server, cleanup func(), err error)
return nil, nil, err
}
+ shutdown, err := serveChunkExplorer(ctx, globalStore)
+ if err != nil {
+ cleanup()
+ return nil, nil, err
+ }
+ if shutdown != nil {
+ cleanup = func() {
+ shutdown()
+
+ if c, ok := globalStore.(io.Closer); ok {
+ if err := c.Close(); err != nil {
+ log.Error("global store: close", "err", err)
+ }
+ }
+ }
+ }
+
return server, cleanup, nil
}
diff --git a/cmd/swarm/global-store/global_store_test.go b/cmd/swarm/global-store/global_store_test.go
index 63f1c5389..c437c9d45 100644
--- a/cmd/swarm/global-store/global_store_test.go
+++ b/cmd/swarm/global-store/global_store_test.go
@@ -69,16 +69,7 @@ func testHTTP(t *testing.T, put bool, args ...string) {
// wait until global store process is started as
// rpc.DialHTTP is actually not connecting
- for i := 0; i < 1000; i++ {
- _, err = http.DefaultClient.Get("http://" + addr)
- if err == nil {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil {
- t.Fatal(err)
- }
+ waitHTTPEndpoint(t, addr)
store := mockRPC.NewGlobalStore(client)
defer store.Close()
@@ -137,19 +128,7 @@ func testWebsocket(t *testing.T, put bool, args ...string) {
testCmd := runGlobalStore(t, append([]string{"ws", "--addr", addr}, args...)...)
defer testCmd.Kill()
- var client *rpc.Client
- var err error
- // wait until global store process is started
- for i := 0; i < 1000; i++ {
- client, err = rpc.DialWebsocket(context.Background(), "ws://"+addr, "")
- if err == nil {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil {
- t.Fatal(err)
- }
+ client := websocketClient(t, addr)
store := mockRPC.NewGlobalStore(client)
defer store.Close()
@@ -160,7 +139,7 @@ func testWebsocket(t *testing.T, put bool, args ...string) {
wantValue := "value"
if put {
- err = node.Put([]byte(wantKey), []byte(wantValue))
+ err := node.Put([]byte(wantKey), []byte(wantValue))
if err != nil {
t.Fatal(err)
}
@@ -189,3 +168,40 @@ func findFreeTCPAddress(t *testing.T) (addr string) {
return listener.Addr().String()
}
+
+// websocketClient waits until global store process is started
+// and returns rpc client.
+func websocketClient(t *testing.T, addr string) (client *rpc.Client) {
+ t.Helper()
+
+ var err error
+ for i := 0; i < 1000; i++ {
+ client, err = rpc.DialWebsocket(context.Background(), "ws://"+addr, "")
+ if err == nil {
+ break
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ if err != nil {
+ t.Fatal(err)
+ }
+ return client
+}
+
+// waitHTTPEndpoint retries http requests to a provided
+// address until the connection is established.
+func waitHTTPEndpoint(t *testing.T, addr string) {
+ t.Helper()
+
+ var err error
+ for i := 0; i < 1000; i++ {
+ _, err = http.Get("http://" + addr)
+ if err == nil {
+ break
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ if err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/cmd/swarm/global-store/main.go b/cmd/swarm/global-store/main.go
index 51df0099a..52fafc8f6 100644
--- a/cmd/swarm/global-store/main.go
+++ b/cmd/swarm/global-store/main.go
@@ -19,12 +19,14 @@ package main
import (
"os"
- "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/log"
cli "gopkg.in/urfave/cli.v1"
)
-var gitCommit string // Git SHA1 commit hash of the release (set via linker flags)
+var (
+ version = "0.1"
+ gitCommit string // Git SHA1 commit hash of the release (set via linker flags)
+)
func main() {
err := newApp().Run(os.Args)
@@ -37,16 +39,30 @@ func main() {
// newApp construct a new instance of Swarm Global Store.
// Method Run is called on it in the main function and in tests.
func newApp() (app *cli.App) {
- app = utils.NewApp(gitCommit, "Swarm Global Store")
-
+ app = cli.NewApp()
app.Name = "global-store"
+ app.Version = version
+ if len(gitCommit) >= 8 {
+ app.Version += "-" + gitCommit[:8]
+ }
+ app.Usage = "Swarm Global Store"
// app flags (for all commands)
app.Flags = []cli.Flag{
cli.IntFlag{
Name: "verbosity",
Value: 3,
- Usage: "verbosity level",
+ Usage: "Verbosity level.",
+ },
+ cli.StringFlag{
+ Name: "explorer-address",
+ Value: "",
+ Usage: "Chunk explorer HTTP listener address.",
+ },
+ cli.StringSliceFlag{
+ Name: "explorer-cors-origin",
+ Value: nil,
+ Usage: "Chunk explorer CORS origin (can be specified multiple times).",
},
}
@@ -54,7 +70,7 @@ func newApp() (app *cli.App) {
{
Name: "http",
Aliases: []string{"h"},
- Usage: "start swarm global store with http server",
+ Usage: "Start swarm global store with HTTP server.",
Action: startHTTP,
// Flags only for "start" command.
// Allow app flags to be specified after the
@@ -63,19 +79,19 @@ func newApp() (app *cli.App) {
cli.StringFlag{
Name: "dir",
Value: "",
- Usage: "data directory",
+ Usage: "Data directory.",
},
cli.StringFlag{
Name: "addr",
Value: "0.0.0.0:3033",
- Usage: "address to listen for http connection",
+ Usage: "Address to listen for HTTP connections.",
},
),
},
{
Name: "websocket",
Aliases: []string{"ws"},
- Usage: "start swarm global store with websocket server",
+ Usage: "Start swarm global store with WebSocket server.",
Action: startWS,
// Flags only for "start" command.
// Allow app flags to be specified after the
@@ -84,17 +100,17 @@ func newApp() (app *cli.App) {
cli.StringFlag{
Name: "dir",
Value: "",
- Usage: "data directory",
+ Usage: "Data directory.",
},
cli.StringFlag{
Name: "addr",
Value: "0.0.0.0:3033",
- Usage: "address to listen for websocket connection",
+ Usage: "Address to listen for WebSocket connections.",
},
cli.StringSliceFlag{
- Name: "origins",
- Value: &cli.StringSlice{"*"},
- Usage: "websocket origins",
+ Name: "origin",
+ Value: nil,
+ Usage: "WebSocket CORS origin (can be specified multiple times).",
},
),
},
diff --git a/swarm/storage/mock/db/db.go b/swarm/storage/mock/db/db.go
index 73ae199e8..313a61b43 100644
--- a/swarm/storage/mock/db/db.go
+++ b/swarm/storage/mock/db/db.go
@@ -21,8 +21,12 @@ import (
"archive/tar"
"bytes"
"encoding/json"
+ "errors"
+ "fmt"
"io"
"io/ioutil"
+ "sync"
+ "time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
@@ -37,6 +41,10 @@ import (
// release resources used by the database.
type GlobalStore struct {
db *leveldb.DB
+ // protects nodes and keys indexes
+ // in Put and Delete methods
+ nodesLocks sync.Map
+ keysLocks sync.Map
}
// NewGlobalStore creates a new instance of GlobalStore.
@@ -64,14 +72,14 @@ func (s *GlobalStore) NewNodeStore(addr common.Address) *mock.NodeStore {
// Get returns chunk data if the chunk with key exists for node
// on address addr.
func (s *GlobalStore) Get(addr common.Address, key []byte) (data []byte, err error) {
- has, err := s.db.Has(nodeDBKey(addr, key), nil)
+ has, err := s.db.Has(indexForHashesPerNode(addr, key), nil)
if err != nil {
return nil, mock.ErrNotFound
}
if !has {
return nil, mock.ErrNotFound
}
- data, err = s.db.Get(dataDBKey(key), nil)
+ data, err = s.db.Get(indexDataKey(key), nil)
if err == leveldb.ErrNotFound {
err = mock.ErrNotFound
}
@@ -80,28 +88,165 @@ func (s *GlobalStore) Get(addr common.Address, key []byte) (data []byte, err err
// Put saves the chunk data for node with address addr.
func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error {
+ unlock, err := s.lock(addr, key)
+ if err != nil {
+ return err
+ }
+ defer unlock()
+
batch := new(leveldb.Batch)
- batch.Put(nodeDBKey(addr, key), nil)
- batch.Put(dataDBKey(key), data)
+ batch.Put(indexForHashesPerNode(addr, key), nil)
+ batch.Put(indexForNodesWithHash(key, addr), nil)
+ batch.Put(indexForNodes(addr), nil)
+ batch.Put(indexForHashes(key), nil)
+ batch.Put(indexDataKey(key), data)
return s.db.Write(batch, nil)
}
// Delete removes the chunk reference to node with address addr.
func (s *GlobalStore) Delete(addr common.Address, key []byte) error {
+ unlock, err := s.lock(addr, key)
+ if err != nil {
+ return err
+ }
+ defer unlock()
+
batch := new(leveldb.Batch)
- batch.Delete(nodeDBKey(addr, key))
+ batch.Delete(indexForHashesPerNode(addr, key))
+ batch.Delete(indexForNodesWithHash(key, addr))
+
+ // check if this node contains any keys, and if not
+ // remove it from the
+ x := indexForHashesPerNodePrefix(addr)
+ if k, _ := s.db.Get(x, nil); !bytes.HasPrefix(k, x) {
+ batch.Delete(indexForNodes(addr))
+ }
+
+ x = indexForNodesWithHashPrefix(key)
+ if k, _ := s.db.Get(x, nil); !bytes.HasPrefix(k, x) {
+ batch.Delete(indexForHashes(key))
+ }
return s.db.Write(batch, nil)
}
// HasKey returns whether a node with addr contains the key.
func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool {
- has, err := s.db.Has(nodeDBKey(addr, key), nil)
+ has, err := s.db.Has(indexForHashesPerNode(addr, key), nil)
if err != nil {
has = false
}
return has
}
+// Keys returns a paginated list of keys on all nodes.
+func (s *GlobalStore) Keys(startKey []byte, limit int) (keys mock.Keys, err error) {
+ return s.keys(nil, startKey, limit)
+}
+
+// Nodes returns a paginated list of all known nodes.
+func (s *GlobalStore) Nodes(startAddr *common.Address, limit int) (nodes mock.Nodes, err error) {
+ return s.nodes(nil, startAddr, limit)
+}
+
+// NodeKeys returns a paginated list of keys on a node with provided address.
+func (s *GlobalStore) NodeKeys(addr common.Address, startKey []byte, limit int) (keys mock.Keys, err error) {
+ return s.keys(&addr, startKey, limit)
+}
+
+// KeyNodes returns a paginated list of nodes that contain a particular key.
+func (s *GlobalStore) KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) {
+ return s.nodes(key, startAddr, limit)
+}
+
+// keys returns a paginated list of keys. If addr is not nil, only keys on that
+// node will be returned.
+func (s *GlobalStore) keys(addr *common.Address, startKey []byte, limit int) (keys mock.Keys, err error) {
+ iter := s.db.NewIterator(nil, nil)
+ defer iter.Release()
+
+ if limit <= 0 {
+ limit = mock.DefaultLimit
+ }
+
+ prefix := []byte{indexForHashesPrefix}
+ if addr != nil {
+ prefix = indexForHashesPerNodePrefix(*addr)
+ }
+ if startKey != nil {
+ if addr != nil {
+ startKey = indexForHashesPerNode(*addr, startKey)
+ } else {
+ startKey = indexForHashes(startKey)
+ }
+ } else {
+ startKey = prefix
+ }
+
+ ok := iter.Seek(startKey)
+ if !ok {
+ return keys, iter.Error()
+ }
+ for ; ok; ok = iter.Next() {
+ k := iter.Key()
+ if !bytes.HasPrefix(k, prefix) {
+ break
+ }
+ key := append([]byte(nil), bytes.TrimPrefix(k, prefix)...)
+
+ if len(keys.Keys) >= limit {
+ keys.Next = key
+ break
+ }
+
+ keys.Keys = append(keys.Keys, key)
+ }
+ return keys, iter.Error()
+}
+
+// nodes returns a paginated list of node addresses. If key is not nil,
+// only nodes that contain that key will be returned.
+func (s *GlobalStore) nodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) {
+ iter := s.db.NewIterator(nil, nil)
+ defer iter.Release()
+
+ if limit <= 0 {
+ limit = mock.DefaultLimit
+ }
+
+ prefix := []byte{indexForNodesPrefix}
+ if key != nil {
+ prefix = indexForNodesWithHashPrefix(key)
+ }
+ startKey := prefix
+ if startAddr != nil {
+ if key != nil {
+ startKey = indexForNodesWithHash(key, *startAddr)
+ } else {
+ startKey = indexForNodes(*startAddr)
+ }
+ }
+
+ ok := iter.Seek(startKey)
+ if !ok {
+ return nodes, iter.Error()
+ }
+ for ; ok; ok = iter.Next() {
+ k := iter.Key()
+ if !bytes.HasPrefix(k, prefix) {
+ break
+ }
+ addr := common.BytesToAddress(append([]byte(nil), bytes.TrimPrefix(k, prefix)...))
+
+ if len(nodes.Addrs) >= limit {
+ nodes.Next = &addr
+ break
+ }
+
+ nodes.Addrs = append(nodes.Addrs, addr)
+ }
+ return nodes, iter.Error()
+}
+
// Import reads tar archive from a reader that contains exported chunk data.
// It returns the number of chunks imported and an error.
func (s *GlobalStore) Import(r io.Reader) (n int, err error) {
@@ -126,12 +271,18 @@ func (s *GlobalStore) Import(r io.Reader) (n int, err error) {
return n, err
}
+ key := common.Hex2Bytes(hdr.Name)
+
batch := new(leveldb.Batch)
for _, addr := range c.Addrs {
- batch.Put(nodeDBKeyHex(addr, hdr.Name), nil)
+ batch.Put(indexForHashesPerNode(addr, key), nil)
+ batch.Put(indexForNodesWithHash(key, addr), nil)
+ batch.Put(indexForNodes(addr), nil)
}
- batch.Put(dataDBKey(common.Hex2Bytes(hdr.Name)), c.Data)
+ batch.Put(indexForHashes(key), nil)
+ batch.Put(indexDataKey(key), c.Data)
+
if err = s.db.Write(batch, nil); err != nil {
return n, err
}
@@ -150,18 +301,23 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) {
buf := bytes.NewBuffer(make([]byte, 0, 1024))
encoder := json.NewEncoder(buf)
- iter := s.db.NewIterator(util.BytesPrefix(nodeKeyPrefix), nil)
+ snap, err := s.db.GetSnapshot()
+ if err != nil {
+ return 0, err
+ }
+
+ iter := snap.NewIterator(util.BytesPrefix([]byte{indexForHashesByNodePrefix}), nil)
defer iter.Release()
var currentKey string
var addrs []common.Address
- saveChunk := func(hexKey string) error {
- key := common.Hex2Bytes(hexKey)
+ saveChunk := func() error {
+ hexKey := currentKey
- data, err := s.db.Get(dataDBKey(key), nil)
+ data, err := snap.Get(indexDataKey(common.Hex2Bytes(hexKey)), nil)
if err != nil {
- return err
+ return fmt.Errorf("get data %s: %v", hexKey, err)
}
buf.Reset()
@@ -189,8 +345,8 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) {
}
for iter.Next() {
- k := bytes.TrimPrefix(iter.Key(), nodeKeyPrefix)
- i := bytes.Index(k, []byte("-"))
+ k := bytes.TrimPrefix(iter.Key(), []byte{indexForHashesByNodePrefix})
+ i := bytes.Index(k, []byte{keyTermByte})
if i < 0 {
continue
}
@@ -201,7 +357,7 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) {
}
if hexKey != currentKey {
- if err = saveChunk(currentKey); err != nil {
+ if err = saveChunk(); err != nil {
return n, err
}
@@ -209,35 +365,112 @@ func (s *GlobalStore) Export(w io.Writer) (n int, err error) {
}
currentKey = hexKey
- addrs = append(addrs, common.BytesToAddress(k[i:]))
+ addrs = append(addrs, common.BytesToAddress(k[i+1:]))
}
if len(addrs) > 0 {
- if err = saveChunk(currentKey); err != nil {
+ if err = saveChunk(); err != nil {
return n, err
}
}
- return n, err
+ return n, iter.Error()
}
var (
- nodeKeyPrefix = []byte("node-")
- dataKeyPrefix = []byte("data-")
+ // maximal time for lock to wait until it returns error
+ lockTimeout = 3 * time.Second
+ // duration between two lock checks.
+ lockCheckDelay = 30 * time.Microsecond
+ // error returned by lock method when lock timeout is reached
+ errLockTimeout = errors.New("lock timeout")
)
-// nodeDBKey constructs a database key for key/node mappings.
-func nodeDBKey(addr common.Address, key []byte) []byte {
- return nodeDBKeyHex(addr, common.Bytes2Hex(key))
+// lock protects parallel writes in Put and Delete methods for both
+// node with provided address and for data with provided key.
+func (s *GlobalStore) lock(addr common.Address, key []byte) (unlock func(), err error) {
+ start := time.Now()
+ nodeLockKey := addr.Hex()
+ for {
+ _, loaded := s.nodesLocks.LoadOrStore(nodeLockKey, struct{}{})
+ if !loaded {
+ break
+ }
+ time.Sleep(lockCheckDelay)
+ if time.Since(start) > lockTimeout {
+ return nil, errLockTimeout
+ }
+ }
+ start = time.Now()
+ keyLockKey := common.Bytes2Hex(key)
+ for {
+ _, loaded := s.keysLocks.LoadOrStore(keyLockKey, struct{}{})
+ if !loaded {
+ break
+ }
+ time.Sleep(lockCheckDelay)
+ if time.Since(start) > lockTimeout {
+ return nil, errLockTimeout
+ }
+ }
+ return func() {
+ s.nodesLocks.Delete(nodeLockKey)
+ s.keysLocks.Delete(keyLockKey)
+ }, nil
}
-// nodeDBKeyHex constructs a database key for key/node mappings
-// using the hexadecimal string representation of the key.
-func nodeDBKeyHex(addr common.Address, hexKey string) []byte {
- return append(append(nodeKeyPrefix, []byte(hexKey+"-")...), addr[:]...)
+const (
+ // prefixes for different indexes
+ indexDataPrefix = 0
+ indexForNodesWithHashesPrefix = 1
+ indexForHashesByNodePrefix = 2
+ indexForNodesPrefix = 3
+ indexForHashesPrefix = 4
+
+ // keyTermByte splits keys and node addresses
+ // in database keys
+ keyTermByte = 0xff
+)
+
+// indexForHashesPerNode constructs a database key to store keys used in
+// NodeKeys method.
+func indexForHashesPerNode(addr common.Address, key []byte) []byte {
+ return append(indexForHashesPerNodePrefix(addr), key...)
}
-// dataDBkey constructs a database key for key/data storage.
-func dataDBKey(key []byte) []byte {
- return append(dataKeyPrefix, key...)
+// indexForHashesPerNodePrefix returns a prefix containing a node address used in
+// NodeKeys method. Node address is hex encoded to be able to use keyTermByte
+// for splitting node address and key.
+func indexForHashesPerNodePrefix(addr common.Address) []byte {
+ return append([]byte{indexForNodesWithHashesPrefix}, append([]byte(addr.Hex()), keyTermByte)...)
+}
+
+// indexForNodesWithHash constructs a database key to store keys used in
+// KeyNodes method.
+func indexForNodesWithHash(key []byte, addr common.Address) []byte {
+ return append(indexForNodesWithHashPrefix(key), addr[:]...)
+}
+
+// indexForNodesWithHashPrefix returns a prefix containing a key used in
+// KeyNodes method. Key is hex encoded to be able to use keyTermByte
+// for splitting key and node address.
+func indexForNodesWithHashPrefix(key []byte) []byte {
+ return append([]byte{indexForHashesByNodePrefix}, append([]byte(common.Bytes2Hex(key)), keyTermByte)...)
+}
+
+// indexForNodes constructs a database key to store keys used in
+// Nodes method.
+func indexForNodes(addr common.Address) []byte {
+ return append([]byte{indexForNodesPrefix}, addr[:]...)
+}
+
+// indexForHashes constructs a database key to store keys used in
+// Keys method.
+func indexForHashes(key []byte) []byte {
+ return append([]byte{indexForHashesPrefix}, key...)
+}
+
+// indexDataKey constructs a database key for key/data storage.
+func indexDataKey(key []byte) []byte {
+ return append([]byte{indexDataPrefix}, key...)
}
diff --git a/swarm/storage/mock/db/db_test.go b/swarm/storage/mock/db/db_test.go
index 782faaf35..efbf942f6 100644
--- a/swarm/storage/mock/db/db_test.go
+++ b/swarm/storage/mock/db/db_test.go
@@ -1,5 +1,3 @@
-// +build go1.8
-//
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
@@ -29,47 +27,49 @@ import (
// TestDBStore is running a test.MockStore tests
// using test.MockStore function.
func TestDBStore(t *testing.T) {
- dir, err := ioutil.TempDir("", "mock_"+t.Name())
- if err != nil {
- panic(err)
- }
- defer os.RemoveAll(dir)
-
- store, err := NewGlobalStore(dir)
- if err != nil {
- t.Fatal(err)
- }
- defer store.Close()
+ store, cleanup := newTestStore(t)
+ defer cleanup()
test.MockStore(t, store, 100)
}
+// TestDBStoreListings is running test.MockStoreListings tests.
+func TestDBStoreListings(t *testing.T) {
+ store, cleanup := newTestStore(t)
+ defer cleanup()
+
+ test.MockStoreListings(t, store, 1000)
+}
+
// TestImportExport is running a test.ImportExport tests
// using test.MockStore function.
func TestImportExport(t *testing.T) {
- dir1, err := ioutil.TempDir("", "mock_"+t.Name()+"_exporter")
- if err != nil {
- panic(err)
- }
- defer os.RemoveAll(dir1)
+ store1, cleanup := newTestStore(t)
+ defer cleanup()
- store1, err := NewGlobalStore(dir1)
- if err != nil {
- t.Fatal(err)
- }
- defer store1.Close()
-
- dir2, err := ioutil.TempDir("", "mock_"+t.Name()+"_importer")
- if err != nil {
- panic(err)
- }
- defer os.RemoveAll(dir2)
-
- store2, err := NewGlobalStore(dir2)
- if err != nil {
- t.Fatal(err)
- }
- defer store2.Close()
+ store2, cleanup := newTestStore(t)
+ defer cleanup()
test.ImportExport(t, store1, store2, 100)
}
+
+// newTestStore creates a temporary GlobalStore
+// that will be closed and data deleted when
+// calling returned cleanup function.
+func newTestStore(t *testing.T) (s *GlobalStore, cleanup func()) {
+ dir, err := ioutil.TempDir("", "swarm-mock-db-")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ s, err = NewGlobalStore(dir)
+ if err != nil {
+ os.RemoveAll(dir)
+ t.Fatal(err)
+ }
+
+ return s, func() {
+ s.Close()
+ os.RemoveAll(dir)
+ }
+}
diff --git a/swarm/storage/mock/explorer/explorer.go b/swarm/storage/mock/explorer/explorer.go
new file mode 100644
index 000000000..8fffff8fd
--- /dev/null
+++ b/swarm/storage/mock/explorer/explorer.go
@@ -0,0 +1,257 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package explorer
+
+import (
+ "bytes"
+ "encoding/json"
+ "io"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock"
+ "github.com/rs/cors"
+)
+
+const jsonContentType = "application/json; charset=utf-8"
+
+// NewHandler constructs an http.Handler with router
+// that servers requests required by chunk explorer.
+//
+// /api/has-key/{node}/{key}
+// /api/keys?start={key}&node={node}&limit={int[0..1000]}
+// /api/nodes?start={node}&key={key}&limit={int[0..1000]}
+//
+// Data from global store will be served and appropriate
+// CORS headers will be sent if allowed origins are provided.
+func NewHandler(store mock.GlobalStorer, corsOrigins []string) (handler http.Handler) {
+ mux := http.NewServeMux()
+ mux.Handle("/api/has-key/", newHasKeyHandler(store))
+ mux.Handle("/api/keys", newKeysHandler(store))
+ mux.Handle("/api/nodes", newNodesHandler(store))
+ mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+ jsonStatusResponse(w, http.StatusNotFound)
+ })
+ handler = noCacheHandler(mux)
+ if corsOrigins != nil {
+ handler = cors.New(cors.Options{
+ AllowedOrigins: corsOrigins,
+ AllowedMethods: []string{"GET"},
+ MaxAge: 600,
+ }).Handler(handler)
+ }
+ return handler
+}
+
+// newHasKeyHandler returns a new handler that serves
+// requests for HasKey global store method.
+// Possible responses are StatusResponse with
+// status codes 200 or 404 if the chunk is found or not.
+func newHasKeyHandler(store mock.GlobalStorer) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ addr, key, ok := parseHasKeyPath(r.URL.Path)
+ if !ok {
+ jsonStatusResponse(w, http.StatusNotFound)
+ return
+ }
+ found := store.HasKey(addr, key)
+ if !found {
+ jsonStatusResponse(w, http.StatusNotFound)
+ return
+ }
+ jsonStatusResponse(w, http.StatusOK)
+ }
+}
+
+// KeysResponse is a JSON-encoded response for global store
+// Keys and NodeKeys methods.
+type KeysResponse struct {
+ Keys []string `json:"keys"`
+ Next string `json:"next,omitempty"`
+}
+
+// newKeysHandler returns a new handler that serves
+// requests for Key global store method.
+// HTTP response body will be JSON-encoded KeysResponse.
+func newKeysHandler(store mock.GlobalStorer) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ q := r.URL.Query()
+ node := q.Get("node")
+ start, limit := listingPage(q)
+
+ var keys mock.Keys
+ if node == "" {
+ var err error
+ keys, err = store.Keys(common.Hex2Bytes(start), limit)
+ if err != nil {
+ log.Error("chunk explorer: keys handler: get keys", "start", start, "err", err)
+ jsonStatusResponse(w, http.StatusInternalServerError)
+ return
+ }
+ } else {
+ var err error
+ keys, err = store.NodeKeys(common.HexToAddress(node), common.Hex2Bytes(start), limit)
+ if err != nil {
+ log.Error("chunk explorer: keys handler: get node keys", "node", node, "start", start, "err", err)
+ jsonStatusResponse(w, http.StatusInternalServerError)
+ return
+ }
+ }
+ ks := make([]string, len(keys.Keys))
+ for i, k := range keys.Keys {
+ ks[i] = common.Bytes2Hex(k)
+ }
+ data, err := json.Marshal(KeysResponse{
+ Keys: ks,
+ Next: common.Bytes2Hex(keys.Next),
+ })
+ if err != nil {
+ log.Error("chunk explorer: keys handler: json marshal", "err", err)
+ jsonStatusResponse(w, http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", jsonContentType)
+ _, err = io.Copy(w, bytes.NewReader(data))
+ if err != nil {
+ log.Error("chunk explorer: keys handler: write response", "err", err)
+ }
+ }
+}
+
+// NodesResponse is a JSON-encoded response for global store
+// Nodes and KeyNodes methods.
+type NodesResponse struct {
+ Nodes []string `json:"nodes"`
+ Next string `json:"next,omitempty"`
+}
+
+// newNodesHandler returns a new handler that serves
+// requests for Nodes global store method.
+// HTTP response body will be JSON-encoded NodesResponse.
+func newNodesHandler(store mock.GlobalStorer) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ q := r.URL.Query()
+ key := q.Get("key")
+ var start *common.Address
+ queryStart, limit := listingPage(q)
+ if queryStart != "" {
+ s := common.HexToAddress(queryStart)
+ start = &s
+ }
+
+ var nodes mock.Nodes
+ if key == "" {
+ var err error
+ nodes, err = store.Nodes(start, limit)
+ if err != nil {
+ log.Error("chunk explorer: nodes handler: get nodes", "start", queryStart, "err", err)
+ jsonStatusResponse(w, http.StatusInternalServerError)
+ return
+ }
+ } else {
+ var err error
+ nodes, err = store.KeyNodes(common.Hex2Bytes(key), start, limit)
+ if err != nil {
+ log.Error("chunk explorer: nodes handler: get key nodes", "key", key, "start", queryStart, "err", err)
+ jsonStatusResponse(w, http.StatusInternalServerError)
+ return
+ }
+ }
+ ns := make([]string, len(nodes.Addrs))
+ for i, n := range nodes.Addrs {
+ ns[i] = n.Hex()
+ }
+ var next string
+ if nodes.Next != nil {
+ next = nodes.Next.Hex()
+ }
+ data, err := json.Marshal(NodesResponse{
+ Nodes: ns,
+ Next: next,
+ })
+ if err != nil {
+ log.Error("chunk explorer: nodes handler", "err", err)
+ jsonStatusResponse(w, http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", jsonContentType)
+ _, err = io.Copy(w, bytes.NewReader(data))
+ if err != nil {
+ log.Error("chunk explorer: nodes handler: write response", "err", err)
+ }
+ }
+}
+
+// parseHasKeyPath extracts address and key from HTTP request
+// path for HasKey route: /api/has-key/{node}/{key}.
+// If ok is false, the provided path is not matched.
+func parseHasKeyPath(p string) (addr common.Address, key []byte, ok bool) {
+ p = strings.TrimPrefix(p, "/api/has-key/")
+ parts := strings.SplitN(p, "/", 2)
+ if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
+ return addr, nil, false
+ }
+ addr = common.HexToAddress(parts[0])
+ key = common.Hex2Bytes(parts[1])
+ return addr, key, true
+}
+
+// listingPage returns start value and listing limit
+// from url query values.
+func listingPage(q url.Values) (start string, limit int) {
+ // if limit is not a valid integer (or blank string),
+ // ignore the error and use the returned 0 value
+ limit, _ = strconv.Atoi(q.Get("limit"))
+ return q.Get("start"), limit
+}
+
+// StatusResponse is a standardized JSON-encoded response
+// that contains information about HTTP response code
+// for easier status identification.
+type StatusResponse struct {
+ Message string `json:"message"`
+ Code int `json:"code"`
+}
+
+// jsonStatusResponse writes to the response writer
+// JSON-encoded StatusResponse based on the provided status code.
+func jsonStatusResponse(w http.ResponseWriter, code int) {
+ w.Header().Set("Content-Type", jsonContentType)
+ w.WriteHeader(code)
+ err := json.NewEncoder(w).Encode(StatusResponse{
+ Message: http.StatusText(code),
+ Code: code,
+ })
+ if err != nil {
+ log.Error("chunk explorer: json status response", "err", err)
+ }
+}
+
+// noCacheHandler sets required HTTP headers to prevent
+// response caching at the client side.
+func noCacheHandler(h http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
+ w.Header().Set("Pragma", "no-cache")
+ w.Header().Set("Expires", "0")
+ h.ServeHTTP(w, r)
+ })
+}
diff --git a/swarm/storage/mock/explorer/explorer_test.go b/swarm/storage/mock/explorer/explorer_test.go
new file mode 100644
index 000000000..be2668426
--- /dev/null
+++ b/swarm/storage/mock/explorer/explorer_test.go
@@ -0,0 +1,471 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package explorer
+
+import (
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "os"
+ "sort"
+ "strconv"
+ "strings"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
+ "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
+)
+
+// TestHandler_memGlobalStore runs a set of tests
+// to validate handler with mem global store.
+func TestHandler_memGlobalStore(t *testing.T) {
+ t.Parallel()
+
+ globalStore := mem.NewGlobalStore()
+
+ testHandler(t, globalStore)
+}
+
+// TestHandler_dbGlobalStore runs a set of tests
+// to validate handler with database global store.
+func TestHandler_dbGlobalStore(t *testing.T) {
+ t.Parallel()
+
+ dir, err := ioutil.TempDir("", "swarm-mock-explorer-db-")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dir)
+
+ globalStore, err := db.NewGlobalStore(dir)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer globalStore.Close()
+
+ testHandler(t, globalStore)
+}
+
+// testHandler stores data distributed by node addresses
+// and validates if this data is correctly retrievable
+// by using the http.Handler returned by NewHandler function.
+// This test covers all HTTP routes and various get parameters
+// on them to check paginated results.
+func testHandler(t *testing.T, globalStore mock.GlobalStorer) {
+ const (
+ nodeCount = 350
+ keyCount = 250
+ keysOnNodeCount = 150
+ )
+
+ // keys for every node
+ nodeKeys := make(map[string][]string)
+
+ // a node address that is not present in global store
+ invalidAddr := "0x7b8b72938c254cf002c4e1e714d27e022be88d93"
+
+ // a key that is not present in global store
+ invalidKey := "f9824192fb515cfb"
+
+ for i := 1; i <= nodeCount; i++ {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, uint64(i))
+ addr := common.BytesToAddress(b).Hex()
+ nodeKeys[addr] = make([]string, 0)
+ }
+
+ for i := 1; i <= keyCount; i++ {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, uint64(i))
+
+ key := common.Bytes2Hex(b)
+
+ var c int
+ for addr := range nodeKeys {
+ nodeKeys[addr] = append(nodeKeys[addr], key)
+ c++
+ if c >= keysOnNodeCount {
+ break
+ }
+ }
+ }
+
+ // sort keys for every node as they are expected to be
+ // sorted in HTTP responses
+ for _, keys := range nodeKeys {
+ sort.Strings(keys)
+ }
+
+ // nodes for every key
+ keyNodes := make(map[string][]string)
+
+ // construct a reverse mapping of nodes for every key
+ for addr, keys := range nodeKeys {
+ for _, key := range keys {
+ keyNodes[key] = append(keyNodes[key], addr)
+ }
+ }
+
+ // sort node addresses with case insensitive sort,
+ // as hex letters in node addresses are in mixed caps
+ for _, addrs := range keyNodes {
+ sortCaseInsensitive(addrs)
+ }
+
+ // find a key that is not stored at the address
+ var (
+ unmatchedAddr string
+ unmatchedKey string
+ )
+ for addr, keys := range nodeKeys {
+ for key := range keyNodes {
+ var found bool
+ for _, k := range keys {
+ if k == key {
+ found = true
+ break
+ }
+ }
+ if !found {
+ unmatchedAddr = addr
+ unmatchedKey = key
+ }
+ break
+ }
+ if unmatchedAddr != "" {
+ break
+ }
+ }
+ // check if unmatched key/address pair is found
+ if unmatchedAddr == "" || unmatchedKey == "" {
+ t.Fatalf("could not find a key that is not associated with a node")
+ }
+
+ // store the data
+ for addr, keys := range nodeKeys {
+ for _, key := range keys {
+ err := globalStore.Put(common.HexToAddress(addr), common.Hex2Bytes(key), []byte("data"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ }
+
+ handler := NewHandler(globalStore, nil)
+
+ // this subtest confirms that it has uploaded key and that it does not have invalid keys
+ t.Run("has key", func(t *testing.T) {
+ for addr, keys := range nodeKeys {
+ for _, key := range keys {
+ testStatusResponse(t, handler, "/api/has-key/"+addr+"/"+key, http.StatusOK)
+ testStatusResponse(t, handler, "/api/has-key/"+invalidAddr+"/"+key, http.StatusNotFound)
+ }
+ testStatusResponse(t, handler, "/api/has-key/"+addr+"/"+invalidKey, http.StatusNotFound)
+ }
+ testStatusResponse(t, handler, "/api/has-key/"+invalidAddr+"/"+invalidKey, http.StatusNotFound)
+ testStatusResponse(t, handler, "/api/has-key/"+unmatchedAddr+"/"+unmatchedKey, http.StatusNotFound)
+ })
+
+ // this subtest confirms that all keys are are listed in correct order with expected pagination
+ t.Run("keys", func(t *testing.T) {
+ var allKeys []string
+ for key := range keyNodes {
+ allKeys = append(allKeys, key)
+ }
+ sort.Strings(allKeys)
+
+ t.Run("limit 0", testKeys(handler, allKeys, 0, ""))
+ t.Run("limit default", testKeys(handler, allKeys, mock.DefaultLimit, ""))
+ t.Run("limit 2x default", testKeys(handler, allKeys, 2*mock.DefaultLimit, ""))
+ t.Run("limit 0.5x default", testKeys(handler, allKeys, mock.DefaultLimit/2, ""))
+ t.Run("limit max", testKeys(handler, allKeys, mock.MaxLimit, ""))
+ t.Run("limit 2x max", testKeys(handler, allKeys, 2*mock.MaxLimit, ""))
+ t.Run("limit negative", testKeys(handler, allKeys, -10, ""))
+ })
+
+ // this subtest confirms that all keys are are listed for every node in correct order
+ // and that for one node different pagination options are correct
+ t.Run("node keys", func(t *testing.T) {
+ var limitCheckAddr string
+
+ for addr, keys := range nodeKeys {
+ testKeys(handler, keys, 0, addr)(t)
+ if limitCheckAddr == "" {
+ limitCheckAddr = addr
+ }
+ }
+ testKeys(handler, nil, 0, invalidAddr)(t)
+
+ limitCheckKeys := nodeKeys[limitCheckAddr]
+ t.Run("limit 0", testKeys(handler, limitCheckKeys, 0, limitCheckAddr))
+ t.Run("limit default", testKeys(handler, limitCheckKeys, mock.DefaultLimit, limitCheckAddr))
+ t.Run("limit 2x default", testKeys(handler, limitCheckKeys, 2*mock.DefaultLimit, limitCheckAddr))
+ t.Run("limit 0.5x default", testKeys(handler, limitCheckKeys, mock.DefaultLimit/2, limitCheckAddr))
+ t.Run("limit max", testKeys(handler, limitCheckKeys, mock.MaxLimit, limitCheckAddr))
+ t.Run("limit 2x max", testKeys(handler, limitCheckKeys, 2*mock.MaxLimit, limitCheckAddr))
+ t.Run("limit negative", testKeys(handler, limitCheckKeys, -10, limitCheckAddr))
+ })
+
+ // this subtest confirms that all nodes are are listed in correct order with expected pagination
+ t.Run("nodes", func(t *testing.T) {
+ var allNodes []string
+ for addr := range nodeKeys {
+ allNodes = append(allNodes, addr)
+ }
+ sortCaseInsensitive(allNodes)
+
+ t.Run("limit 0", testNodes(handler, allNodes, 0, ""))
+ t.Run("limit default", testNodes(handler, allNodes, mock.DefaultLimit, ""))
+ t.Run("limit 2x default", testNodes(handler, allNodes, 2*mock.DefaultLimit, ""))
+ t.Run("limit 0.5x default", testNodes(handler, allNodes, mock.DefaultLimit/2, ""))
+ t.Run("limit max", testNodes(handler, allNodes, mock.MaxLimit, ""))
+ t.Run("limit 2x max", testNodes(handler, allNodes, 2*mock.MaxLimit, ""))
+ t.Run("limit negative", testNodes(handler, allNodes, -10, ""))
+ })
+
+ // this subtest confirms that all nodes are are listed that contain a a particular key in correct order
+ // and that for one key different node pagination options are correct
+ t.Run("key nodes", func(t *testing.T) {
+ var limitCheckKey string
+
+ for key, addrs := range keyNodes {
+ testNodes(handler, addrs, 0, key)(t)
+ if limitCheckKey == "" {
+ limitCheckKey = key
+ }
+ }
+ testNodes(handler, nil, 0, invalidKey)(t)
+
+ limitCheckKeys := keyNodes[limitCheckKey]
+ t.Run("limit 0", testNodes(handler, limitCheckKeys, 0, limitCheckKey))
+ t.Run("limit default", testNodes(handler, limitCheckKeys, mock.DefaultLimit, limitCheckKey))
+ t.Run("limit 2x default", testNodes(handler, limitCheckKeys, 2*mock.DefaultLimit, limitCheckKey))
+ t.Run("limit 0.5x default", testNodes(handler, limitCheckKeys, mock.DefaultLimit/2, limitCheckKey))
+ t.Run("limit max", testNodes(handler, limitCheckKeys, mock.MaxLimit, limitCheckKey))
+ t.Run("limit 2x max", testNodes(handler, limitCheckKeys, 2*mock.MaxLimit, limitCheckKey))
+ t.Run("limit negative", testNodes(handler, limitCheckKeys, -10, limitCheckKey))
+ })
+}
+
+// testsKeys returns a test function that validates wantKeys against a series of /api/keys
+// HTTP responses with provided limit and node options.
+func testKeys(handler http.Handler, wantKeys []string, limit int, node string) func(t *testing.T) {
+ return func(t *testing.T) {
+ t.Helper()
+
+ wantLimit := limit
+ if wantLimit <= 0 {
+ wantLimit = mock.DefaultLimit
+ }
+ if wantLimit > mock.MaxLimit {
+ wantLimit = mock.MaxLimit
+ }
+ wantKeysLen := len(wantKeys)
+ var i int
+ var startKey string
+ for {
+ var wantNext string
+ start := i * wantLimit
+ end := (i + 1) * wantLimit
+ if end < wantKeysLen {
+ wantNext = wantKeys[end]
+ } else {
+ end = wantKeysLen
+ }
+ testKeysResponse(t, handler, node, startKey, limit, KeysResponse{
+ Keys: wantKeys[start:end],
+ Next: wantNext,
+ })
+ if wantNext == "" {
+ break
+ }
+ startKey = wantNext
+ i++
+ }
+ }
+}
+
+// testNodes returns a test function that validates wantAddrs against a series of /api/nodes
+// HTTP responses with provided limit and key options.
+func testNodes(handler http.Handler, wantAddrs []string, limit int, key string) func(t *testing.T) {
+ return func(t *testing.T) {
+ t.Helper()
+
+ wantLimit := limit
+ if wantLimit <= 0 {
+ wantLimit = mock.DefaultLimit
+ }
+ if wantLimit > mock.MaxLimit {
+ wantLimit = mock.MaxLimit
+ }
+ wantAddrsLen := len(wantAddrs)
+ var i int
+ var startKey string
+ for {
+ var wantNext string
+ start := i * wantLimit
+ end := (i + 1) * wantLimit
+ if end < wantAddrsLen {
+ wantNext = wantAddrs[end]
+ } else {
+ end = wantAddrsLen
+ }
+ testNodesResponse(t, handler, key, startKey, limit, NodesResponse{
+ Nodes: wantAddrs[start:end],
+ Next: wantNext,
+ })
+ if wantNext == "" {
+ break
+ }
+ startKey = wantNext
+ i++
+ }
+ }
+}
+
+// testStatusResponse validates a response made on url if it matches
+// the expected StatusResponse.
+func testStatusResponse(t *testing.T, handler http.Handler, url string, code int) {
+ t.Helper()
+
+ resp := httpGet(t, handler, url)
+
+ if resp.StatusCode != code {
+ t.Errorf("got status code %v, want %v", resp.StatusCode, code)
+ }
+ if got := resp.Header.Get("Content-Type"); got != jsonContentType {
+ t.Errorf("got Content-Type header %q, want %q", got, jsonContentType)
+ }
+ var r StatusResponse
+ if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ t.Fatal(err)
+ }
+ if r.Code != code {
+ t.Errorf("got response code %v, want %v", r.Code, code)
+ }
+ if r.Message != http.StatusText(code) {
+ t.Errorf("got response message %q, want %q", r.Message, http.StatusText(code))
+ }
+}
+
+// testKeysResponse validates response returned from handler on /api/keys
+// with node, start and limit options against KeysResponse.
+func testKeysResponse(t *testing.T, handler http.Handler, node, start string, limit int, want KeysResponse) {
+ t.Helper()
+
+ u, err := url.Parse("/api/keys")
+ if err != nil {
+ t.Fatal(err)
+ }
+ q := u.Query()
+ if node != "" {
+ q.Set("node", node)
+ }
+ if start != "" {
+ q.Set("start", start)
+ }
+ if limit != 0 {
+ q.Set("limit", strconv.Itoa(limit))
+ }
+ u.RawQuery = q.Encode()
+
+ resp := httpGet(t, handler, u.String())
+
+ if resp.StatusCode != http.StatusOK {
+ t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK)
+ }
+ if got := resp.Header.Get("Content-Type"); got != jsonContentType {
+ t.Errorf("got Content-Type header %q, want %q", got, jsonContentType)
+ }
+ var r KeysResponse
+ if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ t.Fatal(err)
+ }
+ if fmt.Sprint(r.Keys) != fmt.Sprint(want.Keys) {
+ t.Errorf("got keys %v, want %v", r.Keys, want.Keys)
+ }
+ if r.Next != want.Next {
+ t.Errorf("got next %s, want %s", r.Next, want.Next)
+ }
+}
+
+// testNodesResponse validates response returned from handler on /api/nodes
+// with key, start and limit options against NodesResponse.
+func testNodesResponse(t *testing.T, handler http.Handler, key, start string, limit int, want NodesResponse) {
+ t.Helper()
+
+ u, err := url.Parse("/api/nodes")
+ if err != nil {
+ t.Fatal(err)
+ }
+ q := u.Query()
+ if key != "" {
+ q.Set("key", key)
+ }
+ if start != "" {
+ q.Set("start", start)
+ }
+ if limit != 0 {
+ q.Set("limit", strconv.Itoa(limit))
+ }
+ u.RawQuery = q.Encode()
+
+ resp := httpGet(t, handler, u.String())
+
+ if resp.StatusCode != http.StatusOK {
+ t.Errorf("got status code %v, want %v", resp.StatusCode, http.StatusOK)
+ }
+ if got := resp.Header.Get("Content-Type"); got != jsonContentType {
+ t.Errorf("got Content-Type header %q, want %q", got, jsonContentType)
+ }
+ var r NodesResponse
+ if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ t.Fatal(err)
+ }
+ if fmt.Sprint(r.Nodes) != fmt.Sprint(want.Nodes) {
+ t.Errorf("got nodes %v, want %v", r.Nodes, want.Nodes)
+ }
+ if r.Next != want.Next {
+ t.Errorf("got next %s, want %s", r.Next, want.Next)
+ }
+}
+
+// httpGet uses httptest recorder to provide a response on handler's url.
+func httpGet(t *testing.T, handler http.Handler, url string) (r *http.Response) {
+ t.Helper()
+
+ req, err := http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ w := httptest.NewRecorder()
+ handler.ServeHTTP(w, req)
+ return w.Result()
+}
+
+// sortCaseInsensitive performs a case insensitive sort on a string slice.
+func sortCaseInsensitive(s []string) {
+ sort.Slice(s, func(i, j int) bool {
+ return strings.ToLower(s[i]) < strings.ToLower(s[j])
+ })
+}
diff --git a/swarm/storage/mock/explorer/headers_test.go b/swarm/storage/mock/explorer/headers_test.go
new file mode 100644
index 000000000..5b8e05ffd
--- /dev/null
+++ b/swarm/storage/mock/explorer/headers_test.go
@@ -0,0 +1,163 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package explorer
+
+import (
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
+)
+
+// TestHandler_CORSOrigin validates that the correct Access-Control-Allow-Origin
+// header is served with various allowed origin settings.
+func TestHandler_CORSOrigin(t *testing.T) {
+ notAllowedOrigin := "http://not-allowed-origin.com/"
+
+ for _, tc := range []struct {
+ name string
+ origins []string
+ }{
+ {
+ name: "no origin",
+ origins: nil,
+ },
+ {
+ name: "single origin",
+ origins: []string{"http://localhost/"},
+ },
+ {
+ name: "multiple origins",
+ origins: []string{"http://localhost/", "http://ethereum.org/"},
+ },
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ handler := NewHandler(mem.NewGlobalStore(), tc.origins)
+
+ origins := tc.origins
+ if origins == nil {
+ // handle the "no origin" test case
+ origins = []string{""}
+ }
+
+ for _, origin := range origins {
+ t.Run(fmt.Sprintf("get %q", origin), newTestCORSOrigin(handler, origin, origin))
+ t.Run(fmt.Sprintf("preflight %q", origin), newTestCORSPreflight(handler, origin, origin))
+ }
+
+ t.Run(fmt.Sprintf("get %q", notAllowedOrigin), newTestCORSOrigin(handler, notAllowedOrigin, ""))
+ t.Run(fmt.Sprintf("preflight %q", notAllowedOrigin), newTestCORSPreflight(handler, notAllowedOrigin, ""))
+ })
+ }
+
+ t.Run("wildcard", func(t *testing.T) {
+ handler := NewHandler(mem.NewGlobalStore(), []string{"*"})
+
+ for _, origin := range []string{
+ "http://example.com/",
+ "http://ethereum.org",
+ "http://localhost",
+ } {
+ t.Run(fmt.Sprintf("get %q", origin), newTestCORSOrigin(handler, origin, origin))
+ t.Run(fmt.Sprintf("preflight %q", origin), newTestCORSPreflight(handler, origin, origin))
+ }
+ })
+}
+
+// newTestCORSOrigin returns a test function that validates if wantOrigin CORS header is
+// served by the handler for a GET request.
+func newTestCORSOrigin(handler http.Handler, origin, wantOrigin string) func(t *testing.T) {
+ return func(t *testing.T) {
+ req, err := http.NewRequest(http.MethodGet, "/", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ req.Header.Set("Origin", origin)
+
+ w := httptest.NewRecorder()
+ handler.ServeHTTP(w, req)
+ resp := w.Result()
+
+ header := resp.Header.Get("Access-Control-Allow-Origin")
+ if header != wantOrigin {
+ t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, wantOrigin)
+ }
+ }
+}
+
+// newTestCORSPreflight returns a test function that validates if wantOrigin CORS header is
+// served by the handler for an OPTIONS CORS preflight request.
+func newTestCORSPreflight(handler http.Handler, origin, wantOrigin string) func(t *testing.T) {
+ return func(t *testing.T) {
+ req, err := http.NewRequest(http.MethodOptions, "/", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ req.Header.Set("Origin", origin)
+ req.Header.Set("Access-Control-Request-Method", "GET")
+
+ w := httptest.NewRecorder()
+ handler.ServeHTTP(w, req)
+ resp := w.Result()
+
+ header := resp.Header.Get("Access-Control-Allow-Origin")
+ if header != wantOrigin {
+ t.Errorf("got Access-Control-Allow-Origin header %q, want %q", header, wantOrigin)
+ }
+ }
+}
+
+// TestHandler_noCacheHeaders validates that no cache headers are server.
+func TestHandler_noCacheHeaders(t *testing.T) {
+ handler := NewHandler(mem.NewGlobalStore(), nil)
+
+ for _, tc := range []struct {
+ url string
+ }{
+ {
+ url: "/",
+ },
+ {
+ url: "/api/nodes",
+ },
+ {
+ url: "/api/keys",
+ },
+ } {
+ req, err := http.NewRequest(http.MethodGet, tc.url, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ w := httptest.NewRecorder()
+ handler.ServeHTTP(w, req)
+ resp := w.Result()
+
+ for header, want := range map[string]string{
+ "Cache-Control": "no-cache, no-store, must-revalidate",
+ "Pragma": "no-cache",
+ "Expires": "0",
+ } {
+ got := resp.Header.Get(header)
+ if got != want {
+ t.Errorf("got %q header %q for url %q, want %q", header, tc.url, got, want)
+ }
+ }
+ }
+}
diff --git a/swarm/storage/mock/explorer/swagger.yaml b/swarm/storage/mock/explorer/swagger.yaml
new file mode 100644
index 000000000..2c014e927
--- /dev/null
+++ b/swarm/storage/mock/explorer/swagger.yaml
@@ -0,0 +1,176 @@
+swagger: '2.0'
+info:
+ title: Swarm Global Store API
+ version: 0.1.0
+tags:
+ - name: Has Key
+ description: Checks if a Key is stored on a Node
+ - name: Keys
+ description: Lists Keys
+ - name: Nodes
+ description: Lists Node addresses
+
+paths:
+ '/api/has-key/{node}/{key}':
+ get:
+ tags:
+ - Has Key
+ summary: Checks if a Key is stored on a Node
+ operationId: hasKey
+ produces:
+ - application/json
+
+ parameters:
+ - name: node
+ in: path
+ required: true
+ type: string
+ format: hex-endoded
+ description: Node address.
+
+ - name: key
+ in: path
+ required: true
+ type: string
+ format: hex-endoded
+ description: Key.
+
+ responses:
+ '200':
+ description: Key is stored on Node
+ schema:
+ $ref: '#/definitions/Status'
+ '404':
+ description: Key is not stored on Node
+ schema:
+ $ref: '#/definitions/Status'
+ '500':
+ description: Internal Server Error
+ schema:
+ $ref: '#/definitions/Status'
+
+ '/api/keys':
+ get:
+ tags:
+ - Keys
+ summary: Lists Keys
+ operationId: keys
+ produces:
+ - application/json
+
+ parameters:
+ - name: start
+ in: query
+ required: false
+ type: string
+ format: hex-encoded Key
+ description: A Key as the starting point for the returned list. It is usually a value from the returned "next" field in the Keys repsonse.
+
+ - name: limit
+ in: query
+ required: false
+ type: integer
+ default: 100
+ minimum: 1
+ maximum: 1000
+ description: Limits the number of Keys returned in on response.
+
+ - name: node
+ in: query
+ required: false
+ type: string
+ format: hex-encoded Node address
+ description: If this parameter is provided, only Keys that are stored on this Node be returned in the response. If not, all known Keys will be returned.
+
+ responses:
+ '200':
+ description: List of Keys
+ schema:
+ $ref: '#/definitions/Keys'
+ '500':
+ description: Internal Server Error
+ schema:
+ $ref: '#/definitions/Status'
+
+ '/api/nodes':
+ get:
+ tags:
+ - Nodes
+ summary: Lists Node addresses
+ operationId: nodes
+ produces:
+ - application/json
+
+ parameters:
+ - name: start
+ in: query
+ required: false
+ type: string
+ format: hex-encoded Node address
+ description: A Node address as the starting point for the returned list. It is usually a value from the returned "next" field in the Nodes repsonse.
+
+ - name: limit
+ in: query
+ required: false
+ type: integer
+ default: 100
+ minimum: 1
+ maximum: 1000
+ description: Limits the number of Node addresses returned in on response.
+
+ - name: key
+ in: query
+ required: false
+ type: string
+ format: hex-encoded Key
+ description: If this parameter is provided, only addresses of Nodes that store this Key will be returned in the response. If not, all known Node addresses will be returned.
+
+ responses:
+ '200':
+ description: List of Node addresses
+ schema:
+ $ref: '#/definitions/Nodes'
+ '500':
+ description: Internal Server Error
+ schema:
+ $ref: '#/definitions/Status'
+
+definitions:
+
+ Status:
+ type: object
+ properties:
+ message:
+ type: string
+ description: HTTP Status Code name.
+ code:
+ type: integer
+ description: HTTP Status Code.
+
+ Keys:
+ type: object
+ properties:
+ keys:
+ type: array
+ description: A list of Keys.
+ items:
+ type: string
+ format: hex-encoded Key
+ next:
+ type: string
+ format: hex-encoded Key
+ description: If present, the next Key in listing. Can be passed as "start" query parameter to continue the listing. If not present, the end of the listing is reached.
+
+ Nodes:
+ type: object
+ properties:
+ nodes:
+ type: array
+ description: A list of Node addresses.
+ items:
+ type: string
+ format: hex-encoded Node address
+ next:
+ type: string
+ format: hex-encoded Node address
+ description: If present, the next Node address in listing. Can be passed as "start" query parameter to continue the listing. If not present, the end of the listing is reached.
diff --git a/swarm/storage/mock/mem/mem.go b/swarm/storage/mock/mem/mem.go
index 3a0a2beb8..38bf098df 100644
--- a/swarm/storage/mock/mem/mem.go
+++ b/swarm/storage/mock/mem/mem.go
@@ -25,6 +25,7 @@ import (
"encoding/json"
"io"
"io/ioutil"
+ "sort"
"sync"
"github.com/ethereum/go-ethereum/common"
@@ -34,16 +35,27 @@ import (
// GlobalStore stores all chunk data and also keys and node addresses relations.
// It implements mock.GlobalStore interface.
type GlobalStore struct {
- nodes map[string]map[common.Address]struct{}
- data map[string][]byte
- mu sync.Mutex
+ // holds a slice of keys per node
+ nodeKeys map[common.Address][][]byte
+ // holds which key is stored on which nodes
+ keyNodes map[string][]common.Address
+ // all node addresses
+ nodes []common.Address
+ // all keys
+ keys [][]byte
+ // all keys data
+ data map[string][]byte
+ mu sync.RWMutex
}
// NewGlobalStore creates a new instance of GlobalStore.
func NewGlobalStore() *GlobalStore {
return &GlobalStore{
- nodes: make(map[string]map[common.Address]struct{}),
- data: make(map[string][]byte),
+ nodeKeys: make(map[common.Address][][]byte),
+ keyNodes: make(map[string][]common.Address),
+ nodes: make([]common.Address, 0),
+ keys: make([][]byte, 0),
+ data: make(map[string][]byte),
}
}
@@ -56,10 +68,10 @@ func (s *GlobalStore) NewNodeStore(addr common.Address) *mock.NodeStore {
// Get returns chunk data if the chunk with key exists for node
// on address addr.
func (s *GlobalStore) Get(addr common.Address, key []byte) (data []byte, err error) {
- s.mu.Lock()
- defer s.mu.Unlock()
+ s.mu.RLock()
+ defer s.mu.RUnlock()
- if _, ok := s.nodes[string(key)][addr]; !ok {
+ if _, has := s.nodeKeyIndex(addr, key); !has {
return nil, mock.ErrNotFound
}
@@ -75,11 +87,33 @@ func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
- if _, ok := s.nodes[string(key)]; !ok {
- s.nodes[string(key)] = make(map[common.Address]struct{})
+ if i, found := s.nodeKeyIndex(addr, key); !found {
+ s.nodeKeys[addr] = append(s.nodeKeys[addr], nil)
+ copy(s.nodeKeys[addr][i+1:], s.nodeKeys[addr][i:])
+ s.nodeKeys[addr][i] = key
}
- s.nodes[string(key)][addr] = struct{}{}
+
+ if i, found := s.keyNodeIndex(key, addr); !found {
+ k := string(key)
+ s.keyNodes[k] = append(s.keyNodes[k], addr)
+ copy(s.keyNodes[k][i+1:], s.keyNodes[k][i:])
+ s.keyNodes[k][i] = addr
+ }
+
+ if i, found := s.nodeIndex(addr); !found {
+ s.nodes = append(s.nodes, addr)
+ copy(s.nodes[i+1:], s.nodes[i:])
+ s.nodes[i] = addr
+ }
+
+ if i, found := s.keyIndex(key); !found {
+ s.keys = append(s.keys, nil)
+ copy(s.keys[i+1:], s.keys[i:])
+ s.keys[i] = key
+ }
+
s.data[string(key)] = data
+
return nil
}
@@ -88,24 +122,177 @@ func (s *GlobalStore) Delete(addr common.Address, key []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
- var count int
- if _, ok := s.nodes[string(key)]; ok {
- delete(s.nodes[string(key)], addr)
- count = len(s.nodes[string(key)])
+ if i, has := s.nodeKeyIndex(addr, key); has {
+ s.nodeKeys[addr] = append(s.nodeKeys[addr][:i], s.nodeKeys[addr][i+1:]...)
}
- if count == 0 {
- delete(s.data, string(key))
+
+ k := string(key)
+ if i, on := s.keyNodeIndex(key, addr); on {
+ s.keyNodes[k] = append(s.keyNodes[k][:i], s.keyNodes[k][i+1:]...)
+ }
+
+ if len(s.nodeKeys[addr]) == 0 {
+ if i, found := s.nodeIndex(addr); found {
+ s.nodes = append(s.nodes[:i], s.nodes[i+1:]...)
+ }
+ }
+
+ if len(s.keyNodes[k]) == 0 {
+ if i, found := s.keyIndex(key); found {
+ s.keys = append(s.keys[:i], s.keys[i+1:]...)
+ }
}
return nil
}
// HasKey returns whether a node with addr contains the key.
-func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool {
- s.mu.Lock()
- defer s.mu.Unlock()
+func (s *GlobalStore) HasKey(addr common.Address, key []byte) (yes bool) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
- _, ok := s.nodes[string(key)][addr]
- return ok
+ _, yes = s.nodeKeyIndex(addr, key)
+ return yes
+}
+
+// keyIndex returns the index of a key in keys slice.
+func (s *GlobalStore) keyIndex(key []byte) (index int, found bool) {
+ l := len(s.keys)
+ index = sort.Search(l, func(i int) bool {
+ return bytes.Compare(s.keys[i], key) >= 0
+ })
+ found = index < l && bytes.Equal(s.keys[index], key)
+ return index, found
+}
+
+// nodeIndex returns the index of a node address in nodes slice.
+func (s *GlobalStore) nodeIndex(addr common.Address) (index int, found bool) {
+ l := len(s.nodes)
+ index = sort.Search(l, func(i int) bool {
+ return bytes.Compare(s.nodes[i][:], addr[:]) >= 0
+ })
+ found = index < l && bytes.Equal(s.nodes[index][:], addr[:])
+ return index, found
+}
+
+// nodeKeyIndex returns the index of a key in nodeKeys slice.
+func (s *GlobalStore) nodeKeyIndex(addr common.Address, key []byte) (index int, found bool) {
+ l := len(s.nodeKeys[addr])
+ index = sort.Search(l, func(i int) bool {
+ return bytes.Compare(s.nodeKeys[addr][i], key) >= 0
+ })
+ found = index < l && bytes.Equal(s.nodeKeys[addr][index], key)
+ return index, found
+}
+
+// keyNodeIndex returns the index of a node address in keyNodes slice.
+func (s *GlobalStore) keyNodeIndex(key []byte, addr common.Address) (index int, found bool) {
+ k := string(key)
+ l := len(s.keyNodes[k])
+ index = sort.Search(l, func(i int) bool {
+ return bytes.Compare(s.keyNodes[k][i][:], addr[:]) >= 0
+ })
+ found = index < l && s.keyNodes[k][index] == addr
+ return index, found
+}
+
+// Keys returns a paginated list of keys on all nodes.
+func (s *GlobalStore) Keys(startKey []byte, limit int) (keys mock.Keys, err error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ var i int
+ if startKey != nil {
+ i, _ = s.keyIndex(startKey)
+ }
+ total := len(s.keys)
+ max := maxIndex(i, limit, total)
+ keys.Keys = make([][]byte, 0, max-i)
+ for ; i < max; i++ {
+ keys.Keys = append(keys.Keys, append([]byte(nil), s.keys[i]...))
+ }
+ if total > max {
+ keys.Next = s.keys[max]
+ }
+ return keys, nil
+}
+
+// Nodes returns a paginated list of all known nodes.
+func (s *GlobalStore) Nodes(startAddr *common.Address, limit int) (nodes mock.Nodes, err error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ var i int
+ if startAddr != nil {
+ i, _ = s.nodeIndex(*startAddr)
+ }
+ total := len(s.nodes)
+ max := maxIndex(i, limit, total)
+ nodes.Addrs = make([]common.Address, 0, max-i)
+ for ; i < max; i++ {
+ nodes.Addrs = append(nodes.Addrs, s.nodes[i])
+ }
+ if total > max {
+ nodes.Next = &s.nodes[max]
+ }
+ return nodes, nil
+}
+
+// NodeKeys returns a paginated list of keys on a node with provided address.
+func (s *GlobalStore) NodeKeys(addr common.Address, startKey []byte, limit int) (keys mock.Keys, err error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ var i int
+ if startKey != nil {
+ i, _ = s.nodeKeyIndex(addr, startKey)
+ }
+ total := len(s.nodeKeys[addr])
+ max := maxIndex(i, limit, total)
+ keys.Keys = make([][]byte, 0, max-i)
+ for ; i < max; i++ {
+ keys.Keys = append(keys.Keys, append([]byte(nil), s.nodeKeys[addr][i]...))
+ }
+ if total > max {
+ keys.Next = s.nodeKeys[addr][max]
+ }
+ return keys, nil
+}
+
+// KeyNodes returns a paginated list of nodes that contain a particular key.
+func (s *GlobalStore) KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ var i int
+ if startAddr != nil {
+ i, _ = s.keyNodeIndex(key, *startAddr)
+ }
+ total := len(s.keyNodes[string(key)])
+ max := maxIndex(i, limit, total)
+ nodes.Addrs = make([]common.Address, 0, max-i)
+ for ; i < max; i++ {
+ nodes.Addrs = append(nodes.Addrs, s.keyNodes[string(key)][i])
+ }
+ if total > max {
+ nodes.Next = &s.keyNodes[string(key)][max]
+ }
+ return nodes, nil
+}
+
+// maxIndex returns the end index for one page listing
+// based on the start index, limit and total number of elements.
+func maxIndex(start, limit, total int) (max int) {
+ if limit <= 0 {
+ limit = mock.DefaultLimit
+ }
+ if limit > mock.MaxLimit {
+ limit = mock.MaxLimit
+ }
+ max = total
+ if start+limit < max {
+ max = start + limit
+ }
+ return max
}
// Import reads tar archive from a reader that contains exported chunk data.
@@ -135,14 +322,26 @@ func (s *GlobalStore) Import(r io.Reader) (n int, err error) {
return n, err
}
- addrs := make(map[common.Address]struct{})
- for _, a := range c.Addrs {
- addrs[a] = struct{}{}
+ key := common.Hex2Bytes(hdr.Name)
+ s.keyNodes[string(key)] = c.Addrs
+ for _, addr := range c.Addrs {
+ if i, has := s.nodeKeyIndex(addr, key); !has {
+ s.nodeKeys[addr] = append(s.nodeKeys[addr], nil)
+ copy(s.nodeKeys[addr][i+1:], s.nodeKeys[addr][i:])
+ s.nodeKeys[addr][i] = key
+ }
+ if i, found := s.nodeIndex(addr); !found {
+ s.nodes = append(s.nodes, addr)
+ copy(s.nodes[i+1:], s.nodes[i:])
+ s.nodes[i] = addr
+ }
}
-
- key := string(common.Hex2Bytes(hdr.Name))
- s.nodes[key] = addrs
- s.data[key] = c.Data
+ if i, found := s.keyIndex(key); !found {
+ s.keys = append(s.keys, nil)
+ copy(s.keys[i+1:], s.keys[i:])
+ s.keys[i] = key
+ }
+ s.data[string(key)] = c.Data
n++
}
return n, err
@@ -151,23 +350,18 @@ func (s *GlobalStore) Import(r io.Reader) (n int, err error) {
// Export writes to a writer a tar archive with all chunk data from
// the store. It returns the number of chunks exported and an error.
func (s *GlobalStore) Export(w io.Writer) (n int, err error) {
- s.mu.Lock()
- defer s.mu.Unlock()
+ s.mu.RLock()
+ defer s.mu.RUnlock()
tw := tar.NewWriter(w)
defer tw.Close()
buf := bytes.NewBuffer(make([]byte, 0, 1024))
encoder := json.NewEncoder(buf)
- for key, addrs := range s.nodes {
- al := make([]common.Address, 0, len(addrs))
- for a := range addrs {
- al = append(al, a)
- }
-
+ for key, addrs := range s.keyNodes {
buf.Reset()
if err = encoder.Encode(mock.ExportedChunk{
- Addrs: al,
+ Addrs: addrs,
Data: s.data[key],
}); err != nil {
return n, err
diff --git a/swarm/storage/mock/mem/mem_test.go b/swarm/storage/mock/mem/mem_test.go
index adcefaabb..d39aaef45 100644
--- a/swarm/storage/mock/mem/mem_test.go
+++ b/swarm/storage/mock/mem/mem_test.go
@@ -28,6 +28,12 @@ func TestGlobalStore(t *testing.T) {
test.MockStore(t, NewGlobalStore(), 100)
}
+// TestGlobalStoreListings is running test for a GlobalStore
+// using test.MockStoreListings function.
+func TestGlobalStoreListings(t *testing.T) {
+ test.MockStoreListings(t, NewGlobalStore(), 1000)
+}
+
// TestImportExport is running tests for importing and
// exporting data between two GlobalStores
// using test.ImportExport function.
diff --git a/swarm/storage/mock/mock.go b/swarm/storage/mock/mock.go
index 626ba3fe1..586112a98 100644
--- a/swarm/storage/mock/mock.go
+++ b/swarm/storage/mock/mock.go
@@ -39,6 +39,17 @@ import (
"github.com/ethereum/go-ethereum/common"
)
+const (
+ // DefaultLimit should be used as default limit for
+ // Keys, Nodes, NodeKeys and KeyNodes GlobarStorer
+ // methids implementations.
+ DefaultLimit = 100
+ // MaxLimit should be used as the maximal returned number
+ // of items for Keys, Nodes, NodeKeys and KeyNodes GlobarStorer
+ // methids implementations, regardless of provided limit.
+ MaxLimit = 1000
+)
+
// ErrNotFound indicates that the chunk is not found.
var ErrNotFound = errors.New("not found")
@@ -76,6 +87,10 @@ func (n *NodeStore) Delete(key []byte) error {
return n.store.Delete(n.addr, key)
}
+func (n *NodeStore) Keys(startKey []byte, limit int) (keys Keys, err error) {
+ return n.store.NodeKeys(n.addr, startKey, limit)
+}
+
// GlobalStorer defines methods for mock db store
// that stores chunk data for all swarm nodes.
// It is used in tests to construct mock NodeStores
@@ -85,12 +100,28 @@ type GlobalStorer interface {
Put(addr common.Address, key []byte, data []byte) error
Delete(addr common.Address, key []byte) error
HasKey(addr common.Address, key []byte) bool
+ Keys(startKey []byte, limit int) (keys Keys, err error)
+ Nodes(startAddr *common.Address, limit int) (nodes Nodes, err error)
+ NodeKeys(addr common.Address, startKey []byte, limit int) (keys Keys, err error)
+ KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes Nodes, err error)
// NewNodeStore creates an instance of NodeStore
// to be used by a single swarm node with
// address addr.
NewNodeStore(addr common.Address) *NodeStore
}
+// Keys are returned results by Keys and NodeKeys GlobalStorer methods.
+type Keys struct {
+ Keys [][]byte
+ Next []byte
+}
+
+// Nodes are returned results by Nodes and KeyNodes GlobalStorer methods.
+type Nodes struct {
+ Addrs []common.Address
+ Next *common.Address
+}
+
// Importer defines method for importing mock store data
// from an exported tar archive.
type Importer interface {
diff --git a/swarm/storage/mock/rpc/rpc.go b/swarm/storage/mock/rpc/rpc.go
index 8cd6c83a7..8150ccff1 100644
--- a/swarm/storage/mock/rpc/rpc.go
+++ b/swarm/storage/mock/rpc/rpc.go
@@ -88,3 +88,27 @@ func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool {
}
return has
}
+
+// Keys returns a paginated list of keys on all nodes.
+func (s *GlobalStore) Keys(startKey []byte, limit int) (keys mock.Keys, err error) {
+ err = s.client.Call(&keys, "mockStore_keys", startKey, limit)
+ return keys, err
+}
+
+// Nodes returns a paginated list of all known nodes.
+func (s *GlobalStore) Nodes(startAddr *common.Address, limit int) (nodes mock.Nodes, err error) {
+ err = s.client.Call(&nodes, "mockStore_nodes", startAddr, limit)
+ return nodes, err
+}
+
+// NodeKeys returns a paginated list of keys on a node with provided address.
+func (s *GlobalStore) NodeKeys(addr common.Address, startKey []byte, limit int) (keys mock.Keys, err error) {
+ err = s.client.Call(&keys, "mockStore_nodeKeys", addr, startKey, limit)
+ return keys, err
+}
+
+// KeyNodes returns a paginated list of nodes that contain a particular key.
+func (s *GlobalStore) KeyNodes(key []byte, startAddr *common.Address, limit int) (nodes mock.Nodes, err error) {
+ err = s.client.Call(&nodes, "mockStore_keyNodes", key, startAddr, limit)
+ return nodes, err
+}
diff --git a/swarm/storage/mock/rpc/rpc_test.go b/swarm/storage/mock/rpc/rpc_test.go
index f62340ede..6c4652355 100644
--- a/swarm/storage/mock/rpc/rpc_test.go
+++ b/swarm/storage/mock/rpc/rpc_test.go
@@ -27,6 +27,27 @@ import (
// TestDBStore is running test for a GlobalStore
// using test.MockStore function.
func TestRPCStore(t *testing.T) {
+ store, cleanup := newTestStore(t)
+ defer cleanup()
+
+ test.MockStore(t, store, 30)
+}
+
+// TestRPCStoreListings is running test for a GlobalStore
+// using test.MockStoreListings function.
+func TestRPCStoreListings(t *testing.T) {
+ store, cleanup := newTestStore(t)
+ defer cleanup()
+
+ test.MockStoreListings(t, store, 1000)
+}
+
+// newTestStore creates a temporary GlobalStore
+// that will be closed when returned cleanup function
+// is called.
+func newTestStore(t *testing.T) (s *GlobalStore, cleanup func()) {
+ t.Helper()
+
serverStore := mem.NewGlobalStore()
server := rpc.NewServer()
@@ -35,7 +56,9 @@ func TestRPCStore(t *testing.T) {
}
store := NewGlobalStore(rpc.DialInProc(server))
- defer store.Close()
-
- test.MockStore(t, store, 30)
+ return store, func() {
+ if err := store.Close(); err != nil {
+ t.Error(err)
+ }
+ }
}
diff --git a/swarm/storage/mock/test/test.go b/swarm/storage/mock/test/test.go
index 69828b144..cc837f0b7 100644
--- a/swarm/storage/mock/test/test.go
+++ b/swarm/storage/mock/test/test.go
@@ -20,6 +20,7 @@ package test
import (
"bytes"
+ "encoding/binary"
"fmt"
"io"
"strconv"
@@ -170,6 +171,123 @@ func MockStore(t *testing.T, globalStore mock.GlobalStorer, n int) {
})
}
+// MockStoreListings tests global store methods Keys, Nodes, NodeKeys and KeyNodes.
+// It uses a provided globalstore to put chunks for n number of node addresses
+// and to validate that methods are returning the right responses.
+func MockStoreListings(t *testing.T, globalStore mock.GlobalStorer, n int) {
+ addrs := make([]common.Address, n)
+ for i := 0; i < n; i++ {
+ addrs[i] = common.HexToAddress(strconv.FormatInt(int64(i)+1, 16))
+ }
+ type chunk struct {
+ key []byte
+ data []byte
+ }
+ const chunksPerNode = 5
+ keys := make([][]byte, n*chunksPerNode)
+ for i := 0; i < n*chunksPerNode; i++ {
+ b := make([]byte, 8)
+ binary.BigEndian.PutUint64(b, uint64(i))
+ keys[i] = b
+ }
+
+ // keep track of keys on every node
+ nodeKeys := make(map[common.Address][][]byte)
+ // keep track of nodes that store particular key
+ keyNodes := make(map[string][]common.Address)
+ for i := 0; i < chunksPerNode; i++ {
+ // put chunks for every address
+ for j := 0; j < n; j++ {
+ addr := addrs[j]
+ key := keys[(i*n)+j]
+ err := globalStore.Put(addr, key, []byte("data"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ nodeKeys[addr] = append(nodeKeys[addr], key)
+ keyNodes[string(key)] = append(keyNodes[string(key)], addr)
+ }
+
+ // test Keys method
+ var startKey []byte
+ var gotKeys [][]byte
+ for {
+ keys, err := globalStore.Keys(startKey, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ gotKeys = append(gotKeys, keys.Keys...)
+ if keys.Next == nil {
+ break
+ }
+ startKey = keys.Next
+ }
+ wantKeys := keys[:(i+1)*n]
+ if fmt.Sprint(gotKeys) != fmt.Sprint(wantKeys) {
+ t.Fatalf("got #%v keys %v, want %v", i+1, gotKeys, wantKeys)
+ }
+
+ // test Nodes method
+ var startNode *common.Address
+ var gotNodes []common.Address
+ for {
+ nodes, err := globalStore.Nodes(startNode, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ gotNodes = append(gotNodes, nodes.Addrs...)
+ if nodes.Next == nil {
+ break
+ }
+ startNode = nodes.Next
+ }
+ wantNodes := addrs
+ if fmt.Sprint(gotNodes) != fmt.Sprint(wantNodes) {
+ t.Fatalf("got #%v nodes %v, want %v", i+1, gotNodes, wantNodes)
+ }
+
+ // test NodeKeys method
+ for addr, wantKeys := range nodeKeys {
+ var startKey []byte
+ var gotKeys [][]byte
+ for {
+ keys, err := globalStore.NodeKeys(addr, startKey, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ gotKeys = append(gotKeys, keys.Keys...)
+ if keys.Next == nil {
+ break
+ }
+ startKey = keys.Next
+ }
+ if fmt.Sprint(gotKeys) != fmt.Sprint(wantKeys) {
+ t.Fatalf("got #%v %s node keys %v, want %v", i+1, addr.Hex(), gotKeys, wantKeys)
+ }
+ }
+
+ // test KeyNodes method
+ for key, wantNodes := range keyNodes {
+ var startNode *common.Address
+ var gotNodes []common.Address
+ for {
+ nodes, err := globalStore.KeyNodes([]byte(key), startNode, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ gotNodes = append(gotNodes, nodes.Addrs...)
+ if nodes.Next == nil {
+ break
+ }
+ startNode = nodes.Next
+ }
+ if fmt.Sprint(gotNodes) != fmt.Sprint(wantNodes) {
+ t.Fatalf("got #%v %x key nodes %v, want %v", i+1, []byte(key), gotNodes, wantNodes)
+ }
+ }
+ }
+}
+
// ImportExport saves chunks to the outStore, exports them to the tar archive,
// imports tar archive to the inStore and checks if all chunks are imported correctly.
func ImportExport(t *testing.T, outStore, inStore mock.GlobalStorer, n int) {