swarm: ctx propagation; bmt fixes; pss generic notification framework (#17150)

* cmd/swarm: minor cli flag text adjustments

* swarm/api/http: sticky footer for swarm landing page using flex

* swarm/api/http: sticky footer for error pages and fix for multiple choices

* cmd/swarm, swarm/storage, swarm: fix  mingw on windows test issues

* cmd/swarm: update description of swarm cmd

* swarm: added network ID test

* cmd/swarm: support for smoke tests on the production swarm cluster

* cmd/swarm/swarm-smoke: simplify cluster logic as per suggestion

* swarm: propagate ctx to internal apis (#754)

* swarm/metrics: collect disk measurements

* swarm/bmt: fix io.Writer interface

  * Write now tolerates arbitrary variable buffers
  * added variable buffer tests
  * Write loop and finalise optimisation
  * refactor / rename
  * add tests for empty input

* swarm/pss: (UPDATE) Generic notifications package (#744)

swarm/pss: Generic package for creating pss notification svcs

* swarm: Adding context to more functions

* swarm/api: change colour of landing page in templates

* swarm/api: change landing page to react to enter keypress
This commit is contained in:
Anton Evangelatov 2018-07-09 14:11:49 +02:00 committed by Balint Gabor
parent 30bdf817a0
commit b3711af051
49 changed files with 1630 additions and 488 deletions

View File

@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
// +build linux darwin freebsd
package main
import (

View File

@ -18,6 +18,7 @@
package main
import (
"context"
"fmt"
"os"
@ -39,7 +40,7 @@ func hash(ctx *cli.Context) {
stat, _ := f.Stat()
fileStore := storage.NewFileStore(storage.NewMapChunkStore(), storage.NewFileStoreParams())
addr, _, err := fileStore.Store(f, stat.Size(), false)
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
} else {

View File

@ -143,7 +143,7 @@ var (
}
SwarmWantManifestFlag = cli.BoolTFlag{
Name: "manifest",
Usage: "Automatic manifest upload",
Usage: "Automatic manifest upload (default true)",
}
SwarmUploadDefaultPath = cli.StringFlag{
Name: "defaultpath",
@ -155,7 +155,7 @@ var (
}
SwarmUploadMimeType = cli.StringFlag{
Name: "mime",
Usage: "force mime type",
Usage: "Manually specify MIME type",
}
SwarmEncryptedFlag = cli.BoolFlag{
Name: "encrypt",

View File

@ -37,8 +37,14 @@ import (
)
func generateEndpoints(scheme string, cluster string, from int, to int) {
if cluster == "prod" {
cluster = ""
} else {
cluster = cluster + "."
}
for port := from; port <= to; port++ {
endpoints = append(endpoints, fmt.Sprintf("%s://%v.%s.swarm-gateways.net", scheme, port, cluster))
endpoints = append(endpoints, fmt.Sprintf("%s://%v.%sswarm-gateways.net", scheme, port, cluster))
}
if includeLocalhost {

View File

@ -58,11 +58,14 @@ func CollectProcessMetrics(refresh time.Duration) {
memPauses := GetOrRegisterMeter("system/memory/pauses", DefaultRegistry)
var diskReads, diskReadBytes, diskWrites, diskWriteBytes Meter
var diskReadBytesCounter, diskWriteBytesCounter Counter
if err := ReadDiskStats(diskstats[0]); err == nil {
diskReads = GetOrRegisterMeter("system/disk/readcount", DefaultRegistry)
diskReadBytes = GetOrRegisterMeter("system/disk/readdata", DefaultRegistry)
diskReadBytesCounter = GetOrRegisterCounter("system/disk/readbytes", DefaultRegistry)
diskWrites = GetOrRegisterMeter("system/disk/writecount", DefaultRegistry)
diskWriteBytes = GetOrRegisterMeter("system/disk/writedata", DefaultRegistry)
diskWriteBytesCounter = GetOrRegisterCounter("system/disk/writebytes", DefaultRegistry)
} else {
log.Debug("Failed to read disk metrics", "err", err)
}
@ -82,6 +85,9 @@ func CollectProcessMetrics(refresh time.Duration) {
diskReadBytes.Mark(diskstats[location1].ReadBytes - diskstats[location2].ReadBytes)
diskWrites.Mark(diskstats[location1].WriteCount - diskstats[location2].WriteCount)
diskWriteBytes.Mark(diskstats[location1].WriteBytes - diskstats[location2].WriteBytes)
diskReadBytesCounter.Inc(diskstats[location1].ReadBytes - diskstats[location2].ReadBytes)
diskWriteBytesCounter.Inc(diskstats[location1].WriteBytes - diskstats[location2].WriteBytes)
}
time.Sleep(refresh)
}

View File

@ -227,28 +227,28 @@ func NewAPI(fileStore *storage.FileStore, dns Resolver, resourceHandler *mru.Han
}
// Upload to be used only in TEST
func (a *API) Upload(uploadDir, index string, toEncrypt bool) (hash string, err error) {
func (a *API) Upload(ctx context.Context, uploadDir, index string, toEncrypt bool) (hash string, err error) {
fs := NewFileSystem(a)
hash, err = fs.Upload(uploadDir, index, toEncrypt)
return hash, err
}
// Retrieve FileStore reader API
func (a *API) Retrieve(addr storage.Address) (reader storage.LazySectionReader, isEncrypted bool) {
return a.fileStore.Retrieve(addr)
func (a *API) Retrieve(ctx context.Context, addr storage.Address) (reader storage.LazySectionReader, isEncrypted bool) {
return a.fileStore.Retrieve(ctx, addr)
}
// Store wraps the Store API call of the embedded FileStore
func (a *API) Store(data io.Reader, size int64, toEncrypt bool) (addr storage.Address, wait func(), err error) {
func (a *API) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr storage.Address, wait func(ctx context.Context) error, err error) {
log.Debug("api.store", "size", size)
return a.fileStore.Store(data, size, toEncrypt)
return a.fileStore.Store(ctx, data, size, toEncrypt)
}
// ErrResolve is returned when an URI cannot be resolved from ENS.
type ErrResolve error
// Resolve resolves a URI to an Address using the MultiResolver.
func (a *API) Resolve(uri *URI) (storage.Address, error) {
func (a *API) Resolve(ctx context.Context, uri *URI) (storage.Address, error) {
apiResolveCount.Inc(1)
log.Trace("resolving", "uri", uri.Addr)
@ -286,34 +286,37 @@ func (a *API) Resolve(uri *URI) (storage.Address, error) {
}
// Put provides singleton manifest creation on top of FileStore store
func (a *API) Put(content, contentType string, toEncrypt bool) (k storage.Address, wait func(), err error) {
func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
apiPutCount.Inc(1)
r := strings.NewReader(content)
key, waitContent, err := a.fileStore.Store(r, int64(len(content)), toEncrypt)
key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.fileStore.Store(r, int64(len(manifest)), toEncrypt)
key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
apiPutFail.Inc(1)
return nil, nil, err
}
return key, func() {
waitContent()
waitManifest()
return key, func(ctx context.Context) error {
err := waitContent(ctx)
if err != nil {
return err
}
return waitManifest(ctx)
}, nil
}
// Get uses iterative manifest retrieval and prefix matching
// to resolve basePath to content using FileStore retrieve
// it returns a section reader, mimeType, status, the key of the actual content and an error
func (a *API) Get(manifestAddr storage.Address, path string) (reader storage.LazySectionReader, mimeType string, status int, contentAddr storage.Address, err error) {
func (a *API) Get(ctx context.Context, manifestAddr storage.Address, path string) (reader storage.LazySectionReader, mimeType string, status int, contentAddr storage.Address, err error) {
log.Debug("api.get", "key", manifestAddr, "path", path)
apiGetCount.Inc(1)
trie, err := loadManifest(a.fileStore, manifestAddr, nil)
trie, err := loadManifest(ctx, a.fileStore, manifestAddr, nil)
if err != nil {
apiGetNotFound.Inc(1)
status = http.StatusNotFound
@ -375,7 +378,7 @@ func (a *API) Get(manifestAddr storage.Address, path string) (reader storage.Laz
log.Trace("resource is multihash", "key", manifestAddr)
// get the manifest the multihash digest points to
trie, err := loadManifest(a.fileStore, manifestAddr, nil)
trie, err := loadManifest(ctx, a.fileStore, manifestAddr, nil)
if err != nil {
apiGetNotFound.Inc(1)
status = http.StatusNotFound
@ -410,7 +413,7 @@ func (a *API) Get(manifestAddr storage.Address, path string) (reader storage.Laz
}
mimeType = entry.ContentType
log.Debug("content lookup key", "key", contentAddr, "mimetype", mimeType)
reader, _ = a.fileStore.Retrieve(contentAddr)
reader, _ = a.fileStore.Retrieve(ctx, contentAddr)
} else {
// no entry found
status = http.StatusNotFound
@ -422,10 +425,10 @@ func (a *API) Get(manifestAddr storage.Address, path string) (reader storage.Laz
}
// Modify loads manifest and checks the content hash before recalculating and storing the manifest.
func (a *API) Modify(addr storage.Address, path, contentHash, contentType string) (storage.Address, error) {
func (a *API) Modify(ctx context.Context, addr storage.Address, path, contentHash, contentType string) (storage.Address, error) {
apiModifyCount.Inc(1)
quitC := make(chan bool)
trie, err := loadManifest(a.fileStore, addr, quitC)
trie, err := loadManifest(ctx, a.fileStore, addr, quitC)
if err != nil {
apiModifyFail.Inc(1)
return nil, err
@ -449,7 +452,7 @@ func (a *API) Modify(addr storage.Address, path, contentHash, contentType string
}
// AddFile creates a new manifest entry, adds it to swarm, then adds a file to swarm.
func (a *API) AddFile(mhash, path, fname string, content []byte, nameresolver bool) (storage.Address, string, error) {
func (a *API) AddFile(ctx context.Context, mhash, path, fname string, content []byte, nameresolver bool) (storage.Address, string, error) {
apiAddFileCount.Inc(1)
uri, err := Parse("bzz:/" + mhash)
@ -457,7 +460,7 @@ func (a *API) AddFile(mhash, path, fname string, content []byte, nameresolver bo
apiAddFileFail.Inc(1)
return nil, "", err
}
mkey, err := a.Resolve(uri)
mkey, err := a.Resolve(ctx, uri)
if err != nil {
apiAddFileFail.Inc(1)
return nil, "", err
@ -476,13 +479,13 @@ func (a *API) AddFile(mhash, path, fname string, content []byte, nameresolver bo
ModTime: time.Now(),
}
mw, err := a.NewManifestWriter(mkey, nil)
mw, err := a.NewManifestWriter(ctx, mkey, nil)
if err != nil {
apiAddFileFail.Inc(1)
return nil, "", err
}
fkey, err := mw.AddEntry(bytes.NewReader(content), entry)
fkey, err := mw.AddEntry(ctx, bytes.NewReader(content), entry)
if err != nil {
apiAddFileFail.Inc(1)
return nil, "", err
@ -496,11 +499,10 @@ func (a *API) AddFile(mhash, path, fname string, content []byte, nameresolver bo
}
return fkey, newMkey.String(), nil
}
// RemoveFile removes a file entry in a manifest.
func (a *API) RemoveFile(mhash, path, fname string, nameresolver bool) (string, error) {
func (a *API) RemoveFile(ctx context.Context, mhash string, path string, fname string, nameresolver bool) (string, error) {
apiRmFileCount.Inc(1)
uri, err := Parse("bzz:/" + mhash)
@ -508,7 +510,7 @@ func (a *API) RemoveFile(mhash, path, fname string, nameresolver bool) (string,
apiRmFileFail.Inc(1)
return "", err
}
mkey, err := a.Resolve(uri)
mkey, err := a.Resolve(ctx, uri)
if err != nil {
apiRmFileFail.Inc(1)
return "", err
@ -519,7 +521,7 @@ func (a *API) RemoveFile(mhash, path, fname string, nameresolver bool) (string,
path = path[1:]
}
mw, err := a.NewManifestWriter(mkey, nil)
mw, err := a.NewManifestWriter(ctx, mkey, nil)
if err != nil {
apiRmFileFail.Inc(1)
return "", err
@ -542,7 +544,7 @@ func (a *API) RemoveFile(mhash, path, fname string, nameresolver bool) (string,
}
// AppendFile removes old manifest, appends file entry to new manifest and adds it to Swarm.
func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content []byte, oldAddr storage.Address, offset int64, addSize int64, nameresolver bool) (storage.Address, string, error) {
func (a *API) AppendFile(ctx context.Context, mhash, path, fname string, existingSize int64, content []byte, oldAddr storage.Address, offset int64, addSize int64, nameresolver bool) (storage.Address, string, error) {
apiAppendFileCount.Inc(1)
buffSize := offset + addSize
@ -552,7 +554,7 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content
buf := make([]byte, buffSize)
oldReader, _ := a.Retrieve(oldAddr)
oldReader, _ := a.Retrieve(ctx, oldAddr)
io.ReadAtLeast(oldReader, buf, int(offset))
newReader := bytes.NewReader(content)
@ -575,7 +577,7 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content
apiAppendFileFail.Inc(1)
return nil, "", err
}
mkey, err := a.Resolve(uri)
mkey, err := a.Resolve(ctx, uri)
if err != nil {
apiAppendFileFail.Inc(1)
return nil, "", err
@ -586,7 +588,7 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content
path = path[1:]
}
mw, err := a.NewManifestWriter(mkey, nil)
mw, err := a.NewManifestWriter(ctx, mkey, nil)
if err != nil {
apiAppendFileFail.Inc(1)
return nil, "", err
@ -606,7 +608,7 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content
ModTime: time.Now(),
}
fkey, err := mw.AddEntry(io.Reader(combinedReader), entry)
fkey, err := mw.AddEntry(ctx, io.Reader(combinedReader), entry)
if err != nil {
apiAppendFileFail.Inc(1)
return nil, "", err
@ -620,23 +622,22 @@ func (a *API) AppendFile(mhash, path, fname string, existingSize int64, content
}
return fkey, newMkey.String(), nil
}
// BuildDirectoryTree used by swarmfs_unix
func (a *API) BuildDirectoryTree(mhash string, nameresolver bool) (addr storage.Address, manifestEntryMap map[string]*manifestTrieEntry, err error) {
func (a *API) BuildDirectoryTree(ctx context.Context, mhash string, nameresolver bool) (addr storage.Address, manifestEntryMap map[string]*manifestTrieEntry, err error) {
uri, err := Parse("bzz:/" + mhash)
if err != nil {
return nil, nil, err
}
addr, err = a.Resolve(uri)
addr, err = a.Resolve(ctx, uri)
if err != nil {
return nil, nil, err
}
quitC := make(chan bool)
rootTrie, err := loadManifest(a.fileStore, addr, quitC)
rootTrie, err := loadManifest(ctx, a.fileStore, addr, quitC)
if err != nil {
return nil, nil, fmt.Errorf("can't load manifest %v: %v", addr.String(), err)
}
@ -725,8 +726,8 @@ func (a *API) ResourceIsValidated() bool {
}
// ResolveResourceManifest retrieves the Mutable Resource manifest for the given address, and returns the address of the metadata chunk.
func (a *API) ResolveResourceManifest(addr storage.Address) (storage.Address, error) {
trie, err := loadManifest(a.fileStore, addr, nil)
func (a *API) ResolveResourceManifest(ctx context.Context, addr storage.Address) (storage.Address, error) {
trie, err := loadManifest(ctx, a.fileStore, addr, nil)
if err != nil {
return nil, fmt.Errorf("cannot load resource manifest: %v", err)
}

View File

@ -85,7 +85,7 @@ func expResponse(content string, mimeType string, status int) *Response {
func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse {
addr := storage.Address(common.Hex2Bytes(bzzhash))
reader, mimeType, status, _, err := api.Get(addr, path)
reader, mimeType, status, _, err := api.Get(context.TODO(), addr, path)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -109,12 +109,15 @@ func TestApiPut(t *testing.T) {
testAPI(t, func(api *API, toEncrypt bool) {
content := "hello"
exp := expResponse(content, "text/plain", 0)
// exp := expResponse([]byte(content), "text/plain", 0)
addr, wait, err := api.Put(content, exp.MimeType, toEncrypt)
ctx := context.TODO()
addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = wait(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
wait()
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
})
@ -226,7 +229,7 @@ func TestAPIResolve(t *testing.T) {
if x.immutable {
uri.Scheme = "bzz-immutable"
}
res, err := api.Resolve(uri)
res, err := api.Resolve(context.TODO(), uri)
if err == nil {
if x.expectErr != nil {
t.Fatalf("expected error %q, got result %q", x.expectErr, res)

View File

@ -18,6 +18,7 @@ package api
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
@ -113,12 +114,13 @@ func (fs *FileSystem) Upload(lpath, index string, toEncrypt bool) (string, error
if err == nil {
stat, _ := f.Stat()
var hash storage.Address
var wait func()
hash, wait, err = fs.api.fileStore.Store(f, stat.Size(), toEncrypt)
var wait func(context.Context) error
ctx := context.TODO()
hash, wait, err = fs.api.fileStore.Store(ctx, f, stat.Size(), toEncrypt)
if hash != nil {
list[i].Hash = hash.Hex()
}
wait()
err = wait(ctx)
awg.Done()
if err == nil {
first512 := make([]byte, 512)
@ -189,7 +191,7 @@ func (fs *FileSystem) Download(bzzpath, localpath string) error {
if err != nil {
return err
}
addr, err := fs.api.Resolve(uri)
addr, err := fs.api.Resolve(context.TODO(), uri)
if err != nil {
return err
}
@ -200,7 +202,7 @@ func (fs *FileSystem) Download(bzzpath, localpath string) error {
}
quitC := make(chan bool)
trie, err := loadManifest(fs.api.fileStore, addr, quitC)
trie, err := loadManifest(context.TODO(), fs.api.fileStore, addr, quitC)
if err != nil {
log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
return err
@ -273,7 +275,7 @@ func retrieveToFile(quitC chan bool, fileStore *storage.FileStore, addr storage.
if err != nil {
return err
}
reader, _ := fileStore.Retrieve(addr)
reader, _ := fileStore.Retrieve(context.TODO(), addr)
writer := bufio.NewWriter(f)
size, err := reader.Size(quitC)
if err != nil {

View File

@ -18,6 +18,7 @@ package api
import (
"bytes"
"context"
"io/ioutil"
"os"
"path/filepath"
@ -63,7 +64,7 @@ func TestApiDirUpload0(t *testing.T) {
checkResponse(t, resp, exp)
addr := storage.Address(common.Hex2Bytes(bzzhash))
_, _, _, _, err = api.Get(addr, "")
_, _, _, _, err = api.Get(context.TODO(), addr, "")
if err == nil {
t.Fatalf("expected error: %v", err)
}
@ -95,7 +96,7 @@ func TestApiDirUploadModify(t *testing.T) {
}
addr := storage.Address(common.Hex2Bytes(bzzhash))
addr, err = api.Modify(addr, "index.html", "", "")
addr, err = api.Modify(context.TODO(), addr, "index.html", "", "")
if err != nil {
t.Errorf("unexpected error: %v", err)
return
@ -105,18 +106,23 @@ func TestApiDirUploadModify(t *testing.T) {
t.Errorf("unexpected error: %v", err)
return
}
hash, wait, err := api.Store(bytes.NewReader(index), int64(len(index)), toEncrypt)
wait()
ctx := context.TODO()
hash, wait, err := api.Store(ctx, bytes.NewReader(index), int64(len(index)), toEncrypt)
if err != nil {
t.Errorf("unexpected error: %v", err)
return
}
addr, err = api.Modify(addr, "index2.html", hash.Hex(), "text/html; charset=utf-8")
err = wait(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
return
}
addr, err = api.Modify(addr, "img/logo.png", hash.Hex(), "text/html; charset=utf-8")
addr, err = api.Modify(context.TODO(), addr, "index2.html", hash.Hex(), "text/html; charset=utf-8")
if err != nil {
t.Errorf("unexpected error: %v", err)
return
}
addr, err = api.Modify(context.TODO(), addr, "img/logo.png", hash.Hex(), "text/html; charset=utf-8")
if err != nil {
t.Errorf("unexpected error: %v", err)
return
@ -137,7 +143,7 @@ func TestApiDirUploadModify(t *testing.T) {
exp = expResponse(content, "text/css", 0)
checkResponse(t, resp, exp)
_, _, _, _, err = api.Get(addr, "")
_, _, _, _, err = api.Get(context.TODO(), addr, "")
if err == nil {
t.Errorf("expected error: %v", err)
}

View File

@ -147,6 +147,14 @@ func Respond(w http.ResponseWriter, req *Request, msg string, code int) {
switch code {
case http.StatusInternalServerError:
log.Output(msg, log.LvlError, l.CallDepth, "ruid", req.ruid, "code", code)
case http.StatusMultipleChoices:
log.Output(msg, log.LvlDebug, l.CallDepth, "ruid", req.ruid, "code", code)
listURI := api.URI{
Scheme: "bzz-list",
Addr: req.uri.Addr,
Path: req.uri.Path,
}
additionalMessage = fmt.Sprintf(`<a href="/%s">multiple choices</a>`, listURI.String())
default:
log.Output(msg, log.LvlDebug, l.CallDepth, "ruid", req.ruid, "code", code)
}

File diff suppressed because one or more lines are too long

View File

@ -23,6 +23,7 @@ import (
"archive/tar"
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -120,7 +121,7 @@ type Request struct {
// HandlePostRaw handles a POST request to a raw bzz-raw:/ URI, stores the request
// body in swarm and returns the resulting storage address as a text/plain response
func (s *Server) HandlePostRaw(w http.ResponseWriter, r *Request) {
func (s *Server) HandlePostRaw(ctx context.Context, w http.ResponseWriter, r *Request) {
log.Debug("handle.post.raw", "ruid", r.ruid)
postRawCount.Inc(1)
@ -147,7 +148,7 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *Request) {
Respond(w, r, "missing Content-Length header in request", http.StatusBadRequest)
return
}
addr, _, err := s.api.Store(r.Body, r.ContentLength, toEncrypt)
addr, _, err := s.api.Store(ctx, r.Body, r.ContentLength, toEncrypt)
if err != nil {
postRawFail.Inc(1)
Respond(w, r, err.Error(), http.StatusInternalServerError)
@ -166,7 +167,7 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *Request) {
// (either a tar archive or multipart form), adds those files either to an
// existing manifest or to a new manifest under <path> and returns the
// resulting manifest hash as a text/plain response
func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) {
func (s *Server) HandlePostFiles(ctx context.Context, w http.ResponseWriter, r *Request) {
log.Debug("handle.post.files", "ruid", r.ruid)
postFilesCount.Inc(1)
@ -184,7 +185,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) {
var addr storage.Address
if r.uri.Addr != "" && r.uri.Addr != "encrypt" {
addr, err = s.api.Resolve(r.uri)
addr, err = s.api.Resolve(ctx, r.uri)
if err != nil {
postFilesFail.Inc(1)
Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusInternalServerError)
@ -192,7 +193,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) {
}
log.Debug("resolved key", "ruid", r.ruid, "key", addr)
} else {
addr, err = s.api.NewManifest(toEncrypt)
addr, err = s.api.NewManifest(ctx, toEncrypt)
if err != nil {
postFilesFail.Inc(1)
Respond(w, r, err.Error(), http.StatusInternalServerError)
@ -201,17 +202,17 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) {
log.Debug("new manifest", "ruid", r.ruid, "key", addr)
}
newAddr, err := s.updateManifest(addr, func(mw *api.ManifestWriter) error {
newAddr, err := s.updateManifest(ctx, addr, func(mw *api.ManifestWriter) error {
switch contentType {
case "application/x-tar":
return s.handleTarUpload(r, mw)
return s.handleTarUpload(ctx, r, mw)
case "multipart/form-data":
return s.handleMultipartUpload(r, params["boundary"], mw)
return s.handleMultipartUpload(ctx, r, params["boundary"], mw)
default:
return s.handleDirectUpload(r, mw)
return s.handleDirectUpload(ctx, r, mw)
}
})
if err != nil {
@ -227,7 +228,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) {
fmt.Fprint(w, newAddr)
}
func (s *Server) handleTarUpload(req *Request, mw *api.ManifestWriter) error {
func (s *Server) handleTarUpload(ctx context.Context, req *Request, mw *api.ManifestWriter) error {
log.Debug("handle.tar.upload", "ruid", req.ruid)
tr := tar.NewReader(req.Body)
for {
@ -253,7 +254,7 @@ func (s *Server) handleTarUpload(req *Request, mw *api.ManifestWriter) error {
ModTime: hdr.ModTime,
}
log.Debug("adding path to new manifest", "ruid", req.ruid, "bytes", entry.Size, "path", entry.Path)
contentKey, err := mw.AddEntry(tr, entry)
contentKey, err := mw.AddEntry(ctx, tr, entry)
if err != nil {
return fmt.Errorf("error adding manifest entry from tar stream: %s", err)
}
@ -261,7 +262,7 @@ func (s *Server) handleTarUpload(req *Request, mw *api.ManifestWriter) error {
}
}
func (s *Server) handleMultipartUpload(req *Request, boundary string, mw *api.ManifestWriter) error {
func (s *Server) handleMultipartUpload(ctx context.Context, req *Request, boundary string, mw *api.ManifestWriter) error {
log.Debug("handle.multipart.upload", "ruid", req.ruid)
mr := multipart.NewReader(req.Body, boundary)
for {
@ -311,7 +312,7 @@ func (s *Server) handleMultipartUpload(req *Request, boundary string, mw *api.Ma
ModTime: time.Now(),
}
log.Debug("adding path to new manifest", "ruid", req.ruid, "bytes", entry.Size, "path", entry.Path)
contentKey, err := mw.AddEntry(reader, entry)
contentKey, err := mw.AddEntry(ctx, reader, entry)
if err != nil {
return fmt.Errorf("error adding manifest entry from multipart form: %s", err)
}
@ -319,9 +320,9 @@ func (s *Server) handleMultipartUpload(req *Request, boundary string, mw *api.Ma
}
}
func (s *Server) handleDirectUpload(req *Request, mw *api.ManifestWriter) error {
func (s *Server) handleDirectUpload(ctx context.Context, req *Request, mw *api.ManifestWriter) error {
log.Debug("handle.direct.upload", "ruid", req.ruid)
key, err := mw.AddEntry(req.Body, &api.ManifestEntry{
key, err := mw.AddEntry(ctx, req.Body, &api.ManifestEntry{
Path: req.uri.Path,
ContentType: req.Header.Get("Content-Type"),
Mode: 0644,
@ -338,18 +339,18 @@ func (s *Server) handleDirectUpload(req *Request, mw *api.ManifestWriter) error
// HandleDelete handles a DELETE request to bzz:/<manifest>/<path>, removes
// <path> from <manifest> and returns the resulting manifest hash as a
// text/plain response
func (s *Server) HandleDelete(w http.ResponseWriter, r *Request) {
func (s *Server) HandleDelete(ctx context.Context, w http.ResponseWriter, r *Request) {
log.Debug("handle.delete", "ruid", r.ruid)
deleteCount.Inc(1)
key, err := s.api.Resolve(r.uri)
key, err := s.api.Resolve(ctx, r.uri)
if err != nil {
deleteFail.Inc(1)
Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusInternalServerError)
return
}
newKey, err := s.updateManifest(key, func(mw *api.ManifestWriter) error {
newKey, err := s.updateManifest(ctx, key, func(mw *api.ManifestWriter) error {
log.Debug(fmt.Sprintf("removing %s from manifest %s", r.uri.Path, key.Log()), "ruid", r.ruid)
return mw.RemoveEntry(r.uri.Path)
})
@ -399,7 +400,7 @@ func resourcePostMode(path string) (isRaw bool, frequency uint64, err error) {
// The resource name will be verbatim what is passed as the address part of the url.
// For example, if a POST is made to /bzz-resource:/foo.eth/raw/13 a new resource with frequency 13
// and name "foo.eth" will be created
func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) {
func (s *Server) HandlePostResource(ctx context.Context, w http.ResponseWriter, r *Request) {
log.Debug("handle.post.resource", "ruid", r.ruid)
var err error
var addr storage.Address
@ -428,7 +429,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) {
// we create a manifest so we can retrieve the resource with bzz:// later
// this manifest has a special "resource type" manifest, and its hash is the key of the mutable resource
// root chunk
m, err := s.api.NewResourceManifest(addr.Hex())
m, err := s.api.NewResourceManifest(ctx, addr.Hex())
if err != nil {
Respond(w, r, fmt.Sprintf("failed to create resource manifest: %v", err), http.StatusInternalServerError)
return
@ -448,7 +449,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) {
// that means that we retrieve the manifest and inspect its Hash member.
manifestAddr := r.uri.Address()
if manifestAddr == nil {
manifestAddr, err = s.api.Resolve(r.uri)
manifestAddr, err = s.api.Resolve(ctx, r.uri)
if err != nil {
getFail.Inc(1)
Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound)
@ -459,7 +460,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) {
}
// get the root chunk key from the manifest
addr, err = s.api.ResolveResourceManifest(manifestAddr)
addr, err = s.api.ResolveResourceManifest(ctx, manifestAddr)
if err != nil {
getFail.Inc(1)
Respond(w, r, fmt.Sprintf("error resolving resource root chunk for %s: %s", r.uri.Addr, err), http.StatusNotFound)
@ -518,19 +519,19 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *Request) {
// bzz-resource://<id>/<n> - get latest update on period n
// bzz-resource://<id>/<n>/<m> - get update version m of period n
// <id> = ens name or hash
func (s *Server) HandleGetResource(w http.ResponseWriter, r *Request) {
s.handleGetResource(w, r)
func (s *Server) HandleGetResource(ctx context.Context, w http.ResponseWriter, r *Request) {
s.handleGetResource(ctx, w, r)
}
// TODO: Enable pass maxPeriod parameter
func (s *Server) handleGetResource(w http.ResponseWriter, r *Request) {
func (s *Server) handleGetResource(ctx context.Context, w http.ResponseWriter, r *Request) {
log.Debug("handle.get.resource", "ruid", r.ruid)
var err error
// resolve the content key.
manifestAddr := r.uri.Address()
if manifestAddr == nil {
manifestAddr, err = s.api.Resolve(r.uri)
manifestAddr, err = s.api.Resolve(ctx, r.uri)
if err != nil {
getFail.Inc(1)
Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound)
@ -541,7 +542,7 @@ func (s *Server) handleGetResource(w http.ResponseWriter, r *Request) {
}
// get the root chunk key from the manifest
key, err := s.api.ResolveResourceManifest(manifestAddr)
key, err := s.api.ResolveResourceManifest(ctx, manifestAddr)
if err != nil {
getFail.Inc(1)
Respond(w, r, fmt.Sprintf("error resolving resource root chunk for %s: %s", r.uri.Addr, err), http.StatusNotFound)
@ -623,13 +624,13 @@ func (s *Server) translateResourceError(w http.ResponseWriter, r *Request, supEr
// given storage key
// - bzz-hash://<key> and responds with the hash of the content stored
// at the given storage key as a text/plain response
func (s *Server) HandleGet(w http.ResponseWriter, r *Request) {
func (s *Server) HandleGet(ctx context.Context, w http.ResponseWriter, r *Request) {
log.Debug("handle.get", "ruid", r.ruid, "uri", r.uri)
getCount.Inc(1)
var err error
addr := r.uri.Address()
if addr == nil {
addr, err = s.api.Resolve(r.uri)
addr, err = s.api.Resolve(ctx, r.uri)
if err != nil {
getFail.Inc(1)
Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound)
@ -644,7 +645,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) {
// if path is set, interpret <key> as a manifest and return the
// raw entry at the given path
if r.uri.Path != "" {
walker, err := s.api.NewManifestWalker(addr, nil)
walker, err := s.api.NewManifestWalker(ctx, addr, nil)
if err != nil {
getFail.Inc(1)
Respond(w, r, fmt.Sprintf("%s is not a manifest", addr), http.StatusBadRequest)
@ -692,7 +693,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) {
}
// check the root chunk exists by retrieving the file's size
reader, isEncrypted := s.api.Retrieve(addr)
reader, isEncrypted := s.api.Retrieve(ctx, addr)
if _, err := reader.Size(nil); err != nil {
getFail.Inc(1)
Respond(w, r, fmt.Sprintf("root chunk not found %s: %s", addr, err), http.StatusNotFound)
@ -721,7 +722,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) {
// HandleGetFiles handles a GET request to bzz:/<manifest> with an Accept
// header of "application/x-tar" and returns a tar stream of all files
// contained in the manifest
func (s *Server) HandleGetFiles(w http.ResponseWriter, r *Request) {
func (s *Server) HandleGetFiles(ctx context.Context, w http.ResponseWriter, r *Request) {
log.Debug("handle.get.files", "ruid", r.ruid, "uri", r.uri)
getFilesCount.Inc(1)
if r.uri.Path != "" {
@ -730,7 +731,7 @@ func (s *Server) HandleGetFiles(w http.ResponseWriter, r *Request) {
return
}
addr, err := s.api.Resolve(r.uri)
addr, err := s.api.Resolve(ctx, r.uri)
if err != nil {
getFilesFail.Inc(1)
Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound)
@ -738,7 +739,7 @@ func (s *Server) HandleGetFiles(w http.ResponseWriter, r *Request) {
}
log.Debug("handle.get.files: resolved", "ruid", r.ruid, "key", addr)
walker, err := s.api.NewManifestWalker(addr, nil)
walker, err := s.api.NewManifestWalker(ctx, addr, nil)
if err != nil {
getFilesFail.Inc(1)
Respond(w, r, err.Error(), http.StatusInternalServerError)
@ -757,7 +758,7 @@ func (s *Server) HandleGetFiles(w http.ResponseWriter, r *Request) {
}
// retrieve the entry's key and size
reader, isEncrypted := s.api.Retrieve(storage.Address(common.Hex2Bytes(entry.Hash)))
reader, isEncrypted := s.api.Retrieve(ctx, storage.Address(common.Hex2Bytes(entry.Hash)))
size, err := reader.Size(nil)
if err != nil {
return err
@ -797,7 +798,7 @@ func (s *Server) HandleGetFiles(w http.ResponseWriter, r *Request) {
// HandleGetList handles a GET request to bzz-list:/<manifest>/<path> and returns
// a list of all files contained in <manifest> under <path> grouped into
// common prefixes using "/" as a delimiter
func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) {
func (s *Server) HandleGetList(ctx context.Context, w http.ResponseWriter, r *Request) {
log.Debug("handle.get.list", "ruid", r.ruid, "uri", r.uri)
getListCount.Inc(1)
// ensure the root path has a trailing slash so that relative URLs work
@ -806,7 +807,7 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) {
return
}
addr, err := s.api.Resolve(r.uri)
addr, err := s.api.Resolve(ctx, r.uri)
if err != nil {
getListFail.Inc(1)
Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound)
@ -814,7 +815,7 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) {
}
log.Debug("handle.get.list: resolved", "ruid", r.ruid, "key", addr)
list, err := s.getManifestList(addr, r.uri.Path)
list, err := s.getManifestList(ctx, addr, r.uri.Path)
if err != nil {
getListFail.Inc(1)
@ -845,8 +846,8 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) {
json.NewEncoder(w).Encode(&list)
}
func (s *Server) getManifestList(addr storage.Address, prefix string) (list api.ManifestList, err error) {
walker, err := s.api.NewManifestWalker(addr, nil)
func (s *Server) getManifestList(ctx context.Context, addr storage.Address, prefix string) (list api.ManifestList, err error) {
walker, err := s.api.NewManifestWalker(ctx, addr, nil)
if err != nil {
return
}
@ -903,7 +904,7 @@ func (s *Server) getManifestList(addr storage.Address, prefix string) (list api.
// HandleGetFile handles a GET request to bzz://<manifest>/<path> and responds
// with the content of the file at <path> from the given <manifest>
func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) {
func (s *Server) HandleGetFile(ctx context.Context, w http.ResponseWriter, r *Request) {
log.Debug("handle.get.file", "ruid", r.ruid)
getFileCount.Inc(1)
// ensure the root path has a trailing slash so that relative URLs work
@ -915,7 +916,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) {
manifestAddr := r.uri.Address()
if manifestAddr == nil {
manifestAddr, err = s.api.Resolve(r.uri)
manifestAddr, err = s.api.Resolve(ctx, r.uri)
if err != nil {
getFileFail.Inc(1)
Respond(w, r, fmt.Sprintf("cannot resolve %s: %s", r.uri.Addr, err), http.StatusNotFound)
@ -927,7 +928,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) {
log.Debug("handle.get.file: resolved", "ruid", r.ruid, "key", manifestAddr)
reader, contentType, status, contentKey, err := s.api.Get(manifestAddr, r.uri.Path)
reader, contentType, status, contentKey, err := s.api.Get(ctx, manifestAddr, r.uri.Path)
etag := common.Bytes2Hex(contentKey)
noneMatchEtag := r.Header.Get("If-None-Match")
@ -954,7 +955,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) {
//the request results in ambiguous files
//e.g. /read with readme.md and readinglist.txt available in manifest
if status == http.StatusMultipleChoices {
list, err := s.getManifestList(manifestAddr, r.uri.Path)
list, err := s.getManifestList(ctx, manifestAddr, r.uri.Path)
if err != nil {
getFileFail.Inc(1)
@ -1011,6 +1012,8 @@ func (b bufferedReadSeeker) Seek(offset int64, whence int) (int64, error) {
}
func (s *Server) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
ctx := context.TODO()
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).UpdateSince(time.Now())
req := &Request{Request: *r, ruid: uuid.New()[:8]}
metrics.GetOrRegisterCounter(fmt.Sprintf("http.request.%s", r.Method), nil).Inc(1)
@ -1055,16 +1058,16 @@ func (s *Server) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
case "POST":
if uri.Raw() {
log.Debug("handlePostRaw")
s.HandlePostRaw(w, req)
s.HandlePostRaw(ctx, w, req)
} else if uri.Resource() {
log.Debug("handlePostResource")
s.HandlePostResource(w, req)
s.HandlePostResource(ctx, w, req)
} else if uri.Immutable() || uri.List() || uri.Hash() {
log.Debug("POST not allowed on immutable, list or hash")
Respond(w, req, fmt.Sprintf("POST method on scheme %s not allowed", uri.Scheme), http.StatusMethodNotAllowed)
} else {
log.Debug("handlePostFiles")
s.HandlePostFiles(w, req)
s.HandlePostFiles(ctx, w, req)
}
case "PUT":
@ -1076,31 +1079,31 @@ func (s *Server) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
Respond(w, req, fmt.Sprintf("DELETE method to %s not allowed", uri), http.StatusBadRequest)
return
}
s.HandleDelete(w, req)
s.HandleDelete(ctx, w, req)
case "GET":
if uri.Resource() {
s.HandleGetResource(w, req)
s.HandleGetResource(ctx, w, req)
return
}
if uri.Raw() || uri.Hash() {
s.HandleGet(w, req)
s.HandleGet(ctx, w, req)
return
}
if uri.List() {
s.HandleGetList(w, req)
s.HandleGetList(ctx, w, req)
return
}
if r.Header.Get("Accept") == "application/x-tar" {
s.HandleGetFiles(w, req)
s.HandleGetFiles(ctx, w, req)
return
}
s.HandleGetFile(w, req)
s.HandleGetFile(ctx, w, req)
default:
Respond(w, req, fmt.Sprintf("%s method is not supported", r.Method), http.StatusMethodNotAllowed)
@ -1109,8 +1112,8 @@ func (s *Server) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
log.Info("served response", "ruid", req.ruid, "code", w.statusCode)
}
func (s *Server) updateManifest(addr storage.Address, update func(mw *api.ManifestWriter) error) (storage.Address, error) {
mw, err := s.api.NewManifestWriter(addr, nil)
func (s *Server) updateManifest(ctx context.Context, addr storage.Address, update func(mw *api.ManifestWriter) error) (storage.Address, error) {
mw, err := s.api.NewManifestWriter(ctx, addr, nil)
if err != nil {
return nil, err
}

View File

@ -18,6 +18,7 @@ package http
import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"errors"
@ -382,15 +383,19 @@ func testBzzGetPath(encrypted bool, t *testing.T) {
for i, mf := range testmanifest {
reader[i] = bytes.NewReader([]byte(mf))
var wait func()
addr[i], wait, err = srv.FileStore.Store(reader[i], int64(len(mf)), encrypted)
var wait func(context.Context) error
ctx := context.TODO()
addr[i], wait, err = srv.FileStore.Store(ctx, reader[i], int64(len(mf)), encrypted)
for j := i + 1; j < len(testmanifest); j++ {
testmanifest[j] = strings.Replace(testmanifest[j], fmt.Sprintf("<key%v>", i), addr[i].Hex(), -1)
}
if err != nil {
t.Fatal(err)
}
wait()
err = wait(ctx)
if err != nil {
t.Fatal(err)
}
}
rootRef := addr[2].Hex()

File diff suppressed because one or more lines are too long

View File

@ -18,6 +18,7 @@ package api
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -61,20 +62,20 @@ type ManifestList struct {
}
// NewManifest creates and stores a new, empty manifest
func (a *API) NewManifest(toEncrypt bool) (storage.Address, error) {
func (a *API) NewManifest(ctx context.Context, toEncrypt bool) (storage.Address, error) {
var manifest Manifest
data, err := json.Marshal(&manifest)
if err != nil {
return nil, err
}
key, wait, err := a.Store(bytes.NewReader(data), int64(len(data)), toEncrypt)
wait()
key, wait, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), toEncrypt)
wait(ctx)
return key, err
}
// Manifest hack for supporting Mutable Resource Updates from the bzz: scheme
// see swarm/api/api.go:API.Get() for more information
func (a *API) NewResourceManifest(resourceAddr string) (storage.Address, error) {
func (a *API) NewResourceManifest(ctx context.Context, resourceAddr string) (storage.Address, error) {
var manifest Manifest
entry := ManifestEntry{
Hash: resourceAddr,
@ -85,7 +86,7 @@ func (a *API) NewResourceManifest(resourceAddr string) (storage.Address, error)
if err != nil {
return nil, err
}
key, _, err := a.Store(bytes.NewReader(data), int64(len(data)), false)
key, _, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), false)
return key, err
}
@ -96,8 +97,8 @@ type ManifestWriter struct {
quitC chan bool
}
func (a *API) NewManifestWriter(addr storage.Address, quitC chan bool) (*ManifestWriter, error) {
trie, err := loadManifest(a.fileStore, addr, quitC)
func (a *API) NewManifestWriter(ctx context.Context, addr storage.Address, quitC chan bool) (*ManifestWriter, error) {
trie, err := loadManifest(ctx, a.fileStore, addr, quitC)
if err != nil {
return nil, fmt.Errorf("error loading manifest %s: %s", addr, err)
}
@ -105,9 +106,8 @@ func (a *API) NewManifestWriter(addr storage.Address, quitC chan bool) (*Manifes
}
// AddEntry stores the given data and adds the resulting key to the manifest
func (m *ManifestWriter) AddEntry(data io.Reader, e *ManifestEntry) (storage.Address, error) {
key, _, err := m.api.Store(data, e.Size, m.trie.encrypted)
func (m *ManifestWriter) AddEntry(ctx context.Context, data io.Reader, e *ManifestEntry) (storage.Address, error) {
key, _, err := m.api.Store(ctx, data, e.Size, m.trie.encrypted)
if err != nil {
return nil, err
}
@ -136,8 +136,8 @@ type ManifestWalker struct {
quitC chan bool
}
func (a *API) NewManifestWalker(addr storage.Address, quitC chan bool) (*ManifestWalker, error) {
trie, err := loadManifest(a.fileStore, addr, quitC)
func (a *API) NewManifestWalker(ctx context.Context, addr storage.Address, quitC chan bool) (*ManifestWalker, error) {
trie, err := loadManifest(ctx, a.fileStore, addr, quitC)
if err != nil {
return nil, fmt.Errorf("error loading manifest %s: %s", addr, err)
}
@ -204,10 +204,10 @@ type manifestTrieEntry struct {
subtrie *manifestTrie
}
func loadManifest(fileStore *storage.FileStore, hash storage.Address, quitC chan bool) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand
func loadManifest(ctx context.Context, fileStore *storage.FileStore, hash storage.Address, quitC chan bool) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand
log.Trace("manifest lookup", "key", hash)
// retrieve manifest via FileStore
manifestReader, isEncrypted := fileStore.Retrieve(hash)
manifestReader, isEncrypted := fileStore.Retrieve(ctx, hash)
log.Trace("reader retrieved", "key", hash)
return readManifest(manifestReader, hash, fileStore, isEncrypted, quitC)
}
@ -382,8 +382,12 @@ func (mt *manifestTrie) recalcAndStore() error {
}
sr := bytes.NewReader(manifest)
key, wait, err2 := mt.fileStore.Store(sr, int64(len(manifest)), mt.encrypted)
wait()
ctx := context.TODO()
key, wait, err2 := mt.fileStore.Store(ctx, sr, int64(len(manifest)), mt.encrypted)
if err2 != nil {
return err2
}
err2 = wait(ctx)
mt.ref = key
return err2
}
@ -391,7 +395,7 @@ func (mt *manifestTrie) recalcAndStore() error {
func (mt *manifestTrie) loadSubTrie(entry *manifestTrieEntry, quitC chan bool) (err error) {
if entry.subtrie == nil {
hash := common.Hex2Bytes(entry.Hash)
entry.subtrie, err = loadManifest(mt.fileStore, hash, quitC)
entry.subtrie, err = loadManifest(context.TODO(), mt.fileStore, hash, quitC)
entry.Hash = "" // might not match, should be recalculated
}
return

View File

@ -17,6 +17,7 @@
package api
import (
"context"
"path"
"github.com/ethereum/go-ethereum/swarm/storage"
@ -45,8 +46,8 @@ func NewStorage(api *API) *Storage {
// its content type
//
// DEPRECATED: Use the HTTP API instead
func (s *Storage) Put(content, contentType string, toEncrypt bool) (storage.Address, func(), error) {
return s.api.Put(content, contentType, toEncrypt)
func (s *Storage) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (storage.Address, func(context.Context) error, error) {
return s.api.Put(ctx, content, contentType, toEncrypt)
}
// Get retrieves the content from bzzpath and reads the response in full
@ -57,16 +58,16 @@ func (s *Storage) Put(content, contentType string, toEncrypt bool) (storage.Addr
// size is resp.Size
//
// DEPRECATED: Use the HTTP API instead
func (s *Storage) Get(bzzpath string) (*Response, error) {
func (s *Storage) Get(ctx context.Context, bzzpath string) (*Response, error) {
uri, err := Parse(path.Join("bzz:/", bzzpath))
if err != nil {
return nil, err
}
addr, err := s.api.Resolve(uri)
addr, err := s.api.Resolve(ctx, uri)
if err != nil {
return nil, err
}
reader, mimeType, status, _, err := s.api.Get(addr, uri.Path)
reader, mimeType, status, _, err := s.api.Get(ctx, addr, uri.Path)
if err != nil {
return nil, err
}
@ -87,16 +88,16 @@ func (s *Storage) Get(bzzpath string) (*Response, error) {
// and merge on to it. creating an entry w conentType (mime)
//
// DEPRECATED: Use the HTTP API instead
func (s *Storage) Modify(rootHash, path, contentHash, contentType string) (newRootHash string, err error) {
func (s *Storage) Modify(ctx context.Context, rootHash, path, contentHash, contentType string) (newRootHash string, err error) {
uri, err := Parse("bzz:/" + rootHash)
if err != nil {
return "", err
}
addr, err := s.api.Resolve(uri)
addr, err := s.api.Resolve(ctx, uri)
if err != nil {
return "", err
}
addr, err = s.api.Modify(addr, path, contentHash, contentType)
addr, err = s.api.Modify(ctx, addr, path, contentHash, contentType)
if err != nil {
return "", err
}

View File

@ -17,6 +17,7 @@
package api
import (
"context"
"testing"
)
@ -31,18 +32,22 @@ func TestStoragePutGet(t *testing.T) {
content := "hello"
exp := expResponse(content, "text/plain", 0)
// exp := expResponse([]byte(content), "text/plain", 0)
bzzkey, wait, err := api.Put(content, exp.MimeType, toEncrypt)
ctx := context.TODO()
bzzkey, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = wait(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
wait()
bzzhash := bzzkey.Hex()
// to check put against the API#Get
resp0 := testGet(t, api.api, bzzhash, "")
checkResponse(t, resp0, exp)
// check storage#Get
resp, err := api.Get(bzzhash)
resp, err := api.Get(context.TODO(), bzzhash)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -117,10 +117,7 @@ func NewTreePool(hasher BaseHasherFunc, segmentCount, capacity int) *TreePool {
zerohashes[0] = zeros
h := hasher()
for i := 1; i < depth; i++ {
h.Reset()
h.Write(zeros)
h.Write(zeros)
zeros = h.Sum(nil)
zeros = doHash(h, nil, zeros, zeros)
zerohashes[i] = zeros
}
return &TreePool{
@ -318,41 +315,19 @@ func (h *Hasher) Sum(b []byte) (r []byte) {
// * if sequential write is used (can read sections)
func (h *Hasher) sum(b []byte, release, section bool) (r []byte) {
t := h.bmt
h.finalise(section)
if t.offset > 0 { // get the last node (double segment)
// padding the segment with zero
copy(t.segment[t.offset:], h.pool.zerohashes[0])
}
if section {
if t.cur%2 == 1 {
// if just finished current segment, copy it to the right half of the chunk
copy(t.section[h.pool.SegmentSize:], t.segment)
} else {
// copy segment to front of section, zero pad the right half
copy(t.section, t.segment)
copy(t.section[h.pool.SegmentSize:], h.pool.zerohashes[0])
}
h.writeSection(t.cur, t.section)
} else {
// TODO: h.writeSegment(t.cur, t.segment)
panic("SegmentWriter not implemented")
}
bh := h.pool.hasher()
go h.writeSection(t.cur, t.section, true)
bmtHash := <-t.result
span := t.span
// fmt.Println(t.draw(bmtHash))
if release {
h.releaseTree()
}
// sha3(span + BMT(pure_chunk))
// b + sha3(span + BMT(pure_chunk))
if span == nil {
return bmtHash
return append(b, bmtHash...)
}
bh := h.pool.hasher()
bh.Reset()
bh.Write(span)
bh.Write(bmtHash)
return bh.Sum(b)
return doHash(bh, b, span, bmtHash)
}
// Hasher implements the SwarmHash interface
@ -367,37 +342,41 @@ func (h *Hasher) Write(b []byte) (int, error) {
return 0, nil
}
t := h.bmt
need := (h.pool.SegmentCount - t.cur) * h.pool.SegmentSize
if l < need {
need = l
secsize := 2 * h.pool.SegmentSize
// calculate length of missing bit to complete current open section
smax := secsize - t.offset
// if at the beginning of chunk or middle of the section
if t.offset < secsize {
// fill up current segment from buffer
copy(t.section[t.offset:], b)
// if input buffer consumed and open section not complete, then
// advance offset and return
if smax == 0 {
smax = secsize
}
// calculate missing bit to complete current open segment
rest := h.pool.SegmentSize - t.offset
if need < rest {
rest = need
if l <= smax {
t.offset += l
return l, nil
}
copy(t.segment[t.offset:], b[:rest])
need -= rest
size := (t.offset + rest) % h.pool.SegmentSize
// read full segments and the last possibly partial segment
for need > 0 {
// push all finished chunks we read
if t.cur%2 == 0 {
copy(t.section, t.segment)
} else {
copy(t.section[h.pool.SegmentSize:], t.segment)
h.writeSection(t.cur, t.section)
if t.cur == h.pool.SegmentCount*2 {
return 0, nil
}
size = h.pool.SegmentSize
if need < size {
size = need
}
copy(t.segment, b[rest:rest+size])
need -= size
rest += size
// read full segments and the last possibly partial segment from the input buffer
for smax < l {
// section complete; push to tree asynchronously
go h.writeSection(t.cur, t.section, false)
// reset section
t.section = make([]byte, secsize)
// copy from imput buffer at smax to right half of section
copy(t.section, b[smax:])
// advance cursor
t.cur++
// smax here represents successive offsets in the input buffer
smax += secsize
}
t.offset = size % h.pool.SegmentSize
t.offset = l - smax + secsize
return l, nil
}
@ -426,6 +405,8 @@ func (h *Hasher) releaseTree() {
t.span = nil
t.hash = nil
h.bmt = nil
t.section = make([]byte, h.pool.SegmentSize*2)
t.segment = make([]byte, h.pool.SegmentSize)
h.pool.release(t)
}
}
@ -435,29 +416,37 @@ func (h *Hasher) releaseTree() {
// go h.run(h.bmt.leaves[i/2], h.pool.hasher(), i%2 == 0, s)
// }
// writeSection writes the hash of i/2-th segction into right level 1 node of the BMT tree
func (h *Hasher) writeSection(i int, section []byte) {
n := h.bmt.leaves[i/2]
// writeSection writes the hash of i-th section into level 1 node of the BMT tree
func (h *Hasher) writeSection(i int, section []byte, final bool) {
// select the leaf node for the section
n := h.bmt.leaves[i]
isLeft := n.isLeft
n = n.parent
bh := h.pool.hasher()
bh.Write(section)
go func() {
sum := bh.Sum(nil)
if n == nil {
h.bmt.result <- sum
return
// hash the section
s := doHash(bh, nil, section)
// write hash into parent node
if final {
// for the last segment use writeFinalNode
h.writeFinalNode(1, n, bh, isLeft, s)
} else {
h.writeNode(n, bh, isLeft, s)
}
h.run(n, bh, isLeft, sum)
}()
}
// run pushes the data to the node
// writeNode pushes the data to the node
// if it is the first of 2 sisters written the routine returns
// if it is the second, it calculates the hash and writes it
// to the parent node recursively
func (h *Hasher) run(n *node, bh hash.Hash, isLeft bool, s []byte) {
func (h *Hasher) writeNode(n *node, bh hash.Hash, isLeft bool, s []byte) {
level := 1
for {
// at the root of the bmt just write the result to the result channel
if n == nil {
h.bmt.result <- s
return
}
// otherwise assign child hash to branc
if isLeft {
n.left = s
} else {
@ -467,44 +456,68 @@ func (h *Hasher) run(n *node, bh hash.Hash, isLeft bool, s []byte) {
if n.toggle() {
return
}
// the second thread now can be sure both left and right children are written
// it calculates the hash of left|right and take it to the next level
bh.Reset()
bh.Write(n.left)
bh.Write(n.right)
s = bh.Sum(nil)
// at the root of the bmt just write the result to the result channel
if n.parent == nil {
h.bmt.result <- s
return
}
// otherwise iterate on parent
// the thread coming later now can be sure both left and right children are written
// it calculates the hash of left|right and pushes it to the parent
s = doHash(bh, nil, n.left, n.right)
isLeft = n.isLeft
n = n.parent
level++
}
}
// finalise is following the path starting from the final datasegment to the
// writeFinalNode is following the path starting from the final datasegment to the
// BMT root via parents
// for unbalanced trees it fills in the missing right sister nodes using
// the pool's lookup table for BMT subtree root hashes for all-zero sections
func (h *Hasher) finalise(skip bool) {
t := h.bmt
isLeft := t.cur%2 == 0
n := t.leaves[t.cur/2]
for level := 0; n != nil; level++ {
// when the final segment's path is going via left child node
// otherwise behaves like `writeNode`
func (h *Hasher) writeFinalNode(level int, n *node, bh hash.Hash, isLeft bool, s []byte) {
for {
// at the root of the bmt just write the result to the result channel
if n == nil {
if s != nil {
h.bmt.result <- s
}
return
}
var noHash bool
if isLeft {
// coming from left sister branch
// when the final section's path is going via left child node
// we include an all-zero subtree hash for the right level and toggle the node.
// when the path is going through right child node, nothing to do
if isLeft && !skip {
n.right = h.pool.zerohashes[level]
n.toggle()
if s != nil {
n.left = s
// if a left final node carries a hash, it must be the first (and only thread)
// so the toggle is already in passive state no need no call
// yet thread needs to carry on pushing hash to parent
} else {
// if again first thread then propagate nil and calculate no hash
noHash = n.toggle()
}
} else {
// right sister branch
// if s is nil, then thread arrived first at previous node and here there will be two,
// so no need to do anything
if s != nil {
n.right = s
noHash = n.toggle()
} else {
noHash = true
}
}
// the child-thread first arriving will just continue resetting s to nil
// the second thread now can be sure both left and right children are written
// it calculates the hash of left|right and pushes it to the parent
if noHash {
s = nil
} else {
s = doHash(bh, nil, n.left, n.right)
}
skip = false
isLeft = n.isLeft
n = n.parent
level++
}
}
@ -525,6 +538,15 @@ func (n *node) toggle() bool {
return atomic.AddInt32(&n.state, 1)%2 == 1
}
// calculates the hash of the data using hash.Hash
func doHash(h hash.Hash, b []byte, data ...[]byte) []byte {
h.Reset()
for _, v := range data {
h.Write(v)
}
return h.Sum(b)
}
func hashstr(b []byte) string {
end := len(b)
if end > 4 {

View File

@ -80,6 +80,5 @@ func (rh *RefHasher) hash(data []byte, length int) []byte {
}
rh.hasher.Reset()
rh.hasher.Write(section)
s := rh.hasher.Sum(nil)
return s
return rh.hasher.Sum(nil)
}

View File

@ -34,12 +34,12 @@ import (
// the actual data length generated (could be longer than max datalength of the BMT)
const BufferSize = 4128
var counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128}
// calculates the Keccak256 SHA3 hash of the data
func sha3hash(data ...[]byte) []byte {
h := sha3.NewKeccak256()
for _, v := range data {
h.Write(v)
}
return h.Sum(nil)
return doHash(h, nil, data...)
}
// TestRefHasher tests that the RefHasher computes the expected BMT hash for
@ -129,32 +129,49 @@ func TestRefHasher(t *testing.T) {
}
}
// tests if hasher responds with correct hash
func TestHasherEmptyData(t *testing.T) {
hasher := sha3.NewKeccak256
var data []byte
for _, count := range counts {
t.Run(fmt.Sprintf("%d_segments", count), func(t *testing.T) {
pool := NewTreePool(hasher, count, PoolSize)
defer pool.Drain(0)
bmt := New(pool)
rbmt := NewRefHasher(hasher, count)
refHash := rbmt.Hash(data)
expHash := Hash(bmt, nil, data)
if !bytes.Equal(expHash, refHash) {
t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash)
}
})
}
}
func TestHasherCorrectness(t *testing.T) {
err := testHasher(testBaseHasher)
data := newData(BufferSize)
hasher := sha3.NewKeccak256
size := hasher().Size()
var err error
for _, count := range counts {
t.Run(fmt.Sprintf("segments_%v", count), func(t *testing.T) {
max := count * size
incr := 1
capacity := 1
pool := NewTreePool(hasher, count, capacity)
defer pool.Drain(0)
for n := 0; n <= max; n += incr {
incr = 1 + rand.Intn(5)
bmt := New(pool)
err = testHasherCorrectness(bmt, hasher, data, n, count)
if err != nil {
t.Fatal(err)
}
}
func testHasher(f func(BaseHasherFunc, []byte, int, int) error) error {
data := newData(BufferSize)
hasher := sha3.NewKeccak256
size := hasher().Size()
counts := []int{1, 2, 3, 4, 5, 8, 16, 32, 64, 128}
var err error
for _, count := range counts {
max := count * size
incr := 1
for n := 1; n <= max; n += incr {
err = f(hasher, data, n, count)
if err != nil {
return err
})
}
}
}
return nil
}
// Tests that the BMT hasher can be synchronously reused with poolsizes 1 and PoolSize
func TestHasherReuse(t *testing.T) {
@ -215,12 +232,69 @@ LOOP:
}
}
// helper function that creates a tree pool
func testBaseHasher(hasher BaseHasherFunc, d []byte, n, count int) error {
pool := NewTreePool(hasher, count, 1)
// Tests BMT Hasher io.Writer interface is working correctly
// even multiple short random write buffers
func TestBMTHasherWriterBuffers(t *testing.T) {
hasher := sha3.NewKeccak256
for _, count := range counts {
t.Run(fmt.Sprintf("%d_segments", count), func(t *testing.T) {
errc := make(chan error)
pool := NewTreePool(hasher, count, PoolSize)
defer pool.Drain(0)
n := count * 32
bmt := New(pool)
return testHasherCorrectness(bmt, hasher, d, n, count)
data := newData(n)
rbmt := NewRefHasher(hasher, count)
refHash := rbmt.Hash(data)
expHash := Hash(bmt, nil, data)
if !bytes.Equal(expHash, refHash) {
t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash)
}
attempts := 10
f := func() error {
bmt := New(pool)
bmt.Reset()
var buflen int
for offset := 0; offset < n; offset += buflen {
buflen = rand.Intn(n-offset) + 1
read, err := bmt.Write(data[offset : offset+buflen])
if err != nil {
return err
}
if read != buflen {
return fmt.Errorf("incorrect read. expected %v bytes, got %v", buflen, read)
}
}
hash := bmt.Sum(nil)
if !bytes.Equal(hash, expHash) {
return fmt.Errorf("hash mismatch. expected %x, got %x", hash, expHash)
}
return nil
}
for j := 0; j < attempts; j++ {
go func() {
errc <- f()
}()
}
timeout := time.NewTimer(2 * time.Second)
for {
select {
case err := <-errc:
if err != nil {
t.Fatal(err)
}
attempts--
if attempts == 0 {
return
}
case <-timeout.C:
t.Fatalf("timeout")
}
}
})
}
}
// helper function that compares reference and optimised implementations on

View File

@ -84,7 +84,7 @@ func (sf *SwarmFile) Attr(ctx context.Context, a *fuse.Attr) error {
a.Gid = uint32(os.Getegid())
if sf.fileSize == -1 {
reader, _ := sf.mountInfo.swarmApi.Retrieve(sf.addr)
reader, _ := sf.mountInfo.swarmApi.Retrieve(ctx, sf.addr)
quitC := make(chan bool)
size, err := reader.Size(quitC)
if err != nil {
@ -104,7 +104,7 @@ func (sf *SwarmFile) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse
sf.lock.RLock()
defer sf.lock.RUnlock()
if sf.reader == nil {
sf.reader, _ = sf.mountInfo.swarmApi.Retrieve(sf.addr)
sf.reader, _ = sf.mountInfo.swarmApi.Retrieve(ctx, sf.addr)
}
buf := make([]byte, req.Size)
n, err := sf.reader.ReadAt(buf, req.Offset)

View File

@ -20,6 +20,7 @@ package fuse
import (
"bytes"
"context"
"crypto/rand"
"flag"
"fmt"
@ -110,7 +111,7 @@ func createTestFilesAndUploadToSwarm(t *testing.T, api *api.API, files map[strin
}
//upload directory to swarm and return hash
bzzhash, err := api.Upload(uploadDir, "", toEncrypt)
bzzhash, err := api.Upload(context.TODO(), uploadDir, "", toEncrypt)
if err != nil {
t.Fatalf("Error uploading directory %v: %vm encryption: %v", uploadDir, err, toEncrypt)
}

View File

@ -19,6 +19,7 @@
package fuse
import (
"context"
"errors"
"fmt"
"os"
@ -104,7 +105,7 @@ func (swarmfs *SwarmFS) Mount(mhash, mountpoint string) (*MountInfo, error) {
}
log.Trace("swarmfs mount: getting manifest tree")
_, manifestEntryMap, err := swarmfs.swarmApi.BuildDirectoryTree(mhash, true)
_, manifestEntryMap, err := swarmfs.swarmApi.BuildDirectoryTree(context.TODO(), mhash, true)
if err != nil {
return nil, err
}

View File

@ -47,7 +47,7 @@ func externalUnmount(mountPoint string) error {
}
func addFileToSwarm(sf *SwarmFile, content []byte, size int) error {
fkey, mhash, err := sf.mountInfo.swarmApi.AddFile(sf.mountInfo.LatestManifest, sf.path, sf.name, content, true)
fkey, mhash, err := sf.mountInfo.swarmApi.AddFile(context.TODO(), sf.mountInfo.LatestManifest, sf.path, sf.name, content, true)
if err != nil {
return err
}
@ -66,7 +66,7 @@ func addFileToSwarm(sf *SwarmFile, content []byte, size int) error {
}
func removeFileFromSwarm(sf *SwarmFile) error {
mkey, err := sf.mountInfo.swarmApi.RemoveFile(sf.mountInfo.LatestManifest, sf.path, sf.name, true)
mkey, err := sf.mountInfo.swarmApi.RemoveFile(context.TODO(), sf.mountInfo.LatestManifest, sf.path, sf.name, true)
if err != nil {
return err
}
@ -102,7 +102,7 @@ func removeDirectoryFromSwarm(sd *SwarmDir) error {
}
func appendToExistingFileInSwarm(sf *SwarmFile, content []byte, offset int64, length int64) error {
fkey, mhash, err := sf.mountInfo.swarmApi.AppendFile(sf.mountInfo.LatestManifest, sf.path, sf.name, sf.fileSize, content, sf.addr, offset, length, true)
fkey, mhash, err := sf.mountInfo.swarmApi.AppendFile(context.TODO(), sf.mountInfo.LatestManifest, sf.path, sf.name, sf.fileSize, content, sf.addr, offset, length, true)
if err != nil {
return err
}

View File

@ -81,6 +81,9 @@ func Setup(ctx *cli.Context) {
hosttag = ctx.GlobalString(metricsInfluxDBHostTagFlag.Name)
)
// Start system runtime metrics collection
go gethmetrics.CollectProcessMetrics(2 * time.Second)
if enableExport {
log.Info("Enabling swarm metrics export to InfluxDB")
go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "swarm.", map[string]string{

View File

@ -0,0 +1,266 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package network
import (
"bytes"
"context"
"flag"
"fmt"
"math/rand"
"strings"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
)
var (
currentNetworkID int
cnt int
nodeMap map[int][]discover.NodeID
kademlias map[discover.NodeID]*Kademlia
)
const (
NumberOfNets = 4
MaxTimeout = 6
)
func init() {
flag.Parse()
rand.Seed(time.Now().Unix())
}
/*
Run the network ID test.
The test creates one simulations.Network instance,
a number of nodes, then connects nodes with each other in this network.
Each node gets a network ID assigned according to the number of networks.
Having more network IDs is just arbitrary in order to exclude
false positives.
Nodes should only connect with other nodes with the same network ID.
After the setup phase, the test checks on each node if it has the
expected node connections (excluding those not sharing the network ID).
*/
func TestNetworkID(t *testing.T) {
log.Debug("Start test")
//arbitrarily set the number of nodes. It could be any number
numNodes := 24
//the nodeMap maps all nodes (slice value) with the same network ID (key)
nodeMap = make(map[int][]discover.NodeID)
//set up the network and connect nodes
net, err := setupNetwork(numNodes)
if err != nil {
t.Fatalf("Error setting up network: %v", err)
}
defer func() {
//shutdown the snapshot network
log.Trace("Shutting down network")
net.Shutdown()
}()
//let's sleep to ensure all nodes are connected
time.Sleep(1 * time.Second)
//for each group sharing the same network ID...
for _, netIDGroup := range nodeMap {
log.Trace("netIDGroup size", "size", len(netIDGroup))
//...check that their size of the kademlia is of the expected size
//the assumption is that it should be the size of the group minus 1 (the node itself)
for _, node := range netIDGroup {
if kademlias[node].addrs.Size() != len(netIDGroup)-1 {
t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1)
}
kademlias[node].EachAddr(nil, 0, func(addr OverlayAddr, _ int, _ bool) bool {
found := false
for _, nd := range netIDGroup {
p := ToOverlayAddr(nd.Bytes())
if bytes.Equal(p, addr.Address()) {
found = true
}
}
if !found {
t.Fatalf("Expected node not found for node %s", node.String())
}
return true
})
}
}
log.Info("Test terminated successfully")
}
// setup simulated network with bzz/discovery and pss services.
// connects nodes in a circle
// if allowRaw is set, omission of builtin pss encryption is enabled (see PssParams)
func setupNetwork(numnodes int) (net *simulations.Network, err error) {
log.Debug("Setting up network")
quitC := make(chan struct{})
errc := make(chan error)
nodes := make([]*simulations.Node, numnodes)
if numnodes < 16 {
return nil, fmt.Errorf("Minimum sixteen nodes in network")
}
adapter := adapters.NewSimAdapter(newServices())
//create the network
net = simulations.NewNetwork(adapter, &simulations.NetworkConfig{
ID: "NetworkIdTestNet",
DefaultService: "bzz",
})
log.Debug("Creating networks and nodes")
var connCount int
//create nodes and connect them to each other
for i := 0; i < numnodes; i++ {
log.Trace("iteration: ", "i", i)
nodeconf := adapters.RandomNodeConfig()
nodes[i], err = net.NewNodeWithConfig(nodeconf)
if err != nil {
return nil, fmt.Errorf("error creating node %d: %v", i, err)
}
err = net.Start(nodes[i].ID())
if err != nil {
return nil, fmt.Errorf("error starting node %d: %v", i, err)
}
client, err := nodes[i].Client()
if err != nil {
return nil, fmt.Errorf("create node %d rpc client fail: %v", i, err)
}
//now setup and start event watching in order to know when we can upload
ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second)
defer watchCancel()
watchSubscriptionEvents(ctx, nodes[i].ID(), client, errc, quitC)
//on every iteration we connect to all previous ones
for k := i - 1; k >= 0; k-- {
connCount++
log.Debug(fmt.Sprintf("Connecting node %d with node %d; connection count is %d", i, k, connCount))
err = net.Connect(nodes[i].ID(), nodes[k].ID())
if err != nil {
if !strings.Contains(err.Error(), "already connected") {
return nil, fmt.Errorf("error connecting nodes: %v", err)
}
}
}
}
//now wait until the number of expected subscriptions has been finished
//`watchSubscriptionEvents` will write with a `nil` value to errc
for err := range errc {
if err != nil {
return nil, err
}
//`nil` received, decrement count
connCount--
log.Trace("count down", "cnt", connCount)
//all subscriptions received
if connCount == 0 {
close(quitC)
break
}
}
log.Debug("Network setup phase terminated")
return net, nil
}
func newServices() adapters.Services {
kademlias = make(map[discover.NodeID]*Kademlia)
kademlia := func(id discover.NodeID) *Kademlia {
if k, ok := kademlias[id]; ok {
return k
}
addr := NewAddrFromNodeID(id)
params := NewKadParams()
params.MinProxBinSize = 2
params.MaxBinSize = 3
params.MinBinSize = 1
params.MaxRetries = 1000
params.RetryExponent = 2
params.RetryInterval = 1000000
kademlias[id] = NewKademlia(addr.Over(), params)
return kademlias[id]
}
return adapters.Services{
"bzz": func(ctx *adapters.ServiceContext) (node.Service, error) {
addr := NewAddrFromNodeID(ctx.Config.ID)
hp := NewHiveParams()
hp.Discovery = false
cnt++
//assign the network ID
currentNetworkID = cnt % NumberOfNets
if ok := nodeMap[currentNetworkID]; ok == nil {
nodeMap[currentNetworkID] = make([]discover.NodeID, 0)
}
//add this node to the group sharing the same network ID
nodeMap[currentNetworkID] = append(nodeMap[currentNetworkID], ctx.Config.ID)
log.Debug("current network ID:", "id", currentNetworkID)
config := &BzzConfig{
OverlayAddr: addr.Over(),
UnderlayAddr: addr.Under(),
HiveParams: hp,
NetworkID: uint64(currentNetworkID),
}
return NewBzz(config, kademlia(ctx.Config.ID), nil, nil, nil), nil
},
}
}
func watchSubscriptionEvents(ctx context.Context, id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) {
events := make(chan *p2p.PeerEvent)
sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
if err != nil {
log.Error(err.Error())
errc <- fmt.Errorf("error getting peer events for node %v: %s", id, err)
return
}
go func() {
defer func() {
sub.Unsubscribe()
log.Trace("watch subscription events: unsubscribe", "id", id)
}()
for {
select {
case <-quitC:
return
case <-ctx.Done():
select {
case errc <- ctx.Err():
case <-quitC:
}
return
case e := <-events:
if e.Type == p2p.PeerEventTypeAdd {
errc <- nil
}
case err := <-sub.Err():
if err != nil {
select {
case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err):
case <-quitC:
}
return
}
}
}
}()
}

View File

@ -250,7 +250,7 @@ func (r *TestRegistry) APIs() []rpc.API {
}
func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
r, _ := fileStore.Retrieve(hash)
r, _ := fileStore.Retrieve(context.TODO(), hash)
buf := make([]byte, 1024)
var n int
var total int64

View File

@ -345,9 +345,13 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
// here we distribute chunks of a random file into Stores of nodes 1 to nodes
rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams())
size := chunkCount * chunkSize
fileHash, wait, err := rrFileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
ctx := context.TODO()
fileHash, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
// wait until all chunks stored
wait()
if err != nil {
t.Fatal(err.Error())
}
err = wait(ctx)
if err != nil {
t.Fatal(err.Error())
}
@ -627,9 +631,13 @@ Loop:
hashes := make([]storage.Address, chunkCount)
for i := 0; i < chunkCount; i++ {
// create actual size real chunks
hash, wait, err := remoteFileStore.Store(io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false)
ctx := context.TODO()
hash, wait, err := remoteFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false)
if err != nil {
b.Fatalf("expected no error. got %v", err)
}
// wait until all chunks stored
wait()
err = wait(ctx)
if err != nil {
b.Fatalf("expected no error. got %v", err)
}

View File

@ -117,8 +117,12 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
fileStore := storage.NewFileStore(sim.Stores[0], storage.NewFileStoreParams())
size := chunkCount * chunkSize
_, wait, err := fileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
wait()
ctx := context.TODO()
_, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
if err != nil {
t.Fatal(err)
}
err = wait(ctx)
if err != nil {
t.Fatal(err)
}

View File

@ -410,7 +410,7 @@ func runFileRetrievalTest(nodeCount int) error {
fileStore := registries[id].fileStore
//check all chunks
for i, hash := range conf.hashes {
reader, _ := fileStore.Retrieve(hash)
reader, _ := fileStore.Retrieve(context.TODO(), hash)
//check that we can read the file size and that it corresponds to the generated file size
if s, err := reader.Size(nil); err != nil || s != int64(len(randomFiles[i])) {
allSuccess = false
@ -697,7 +697,7 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
fileStore := registries[id].fileStore
//check all chunks
for _, chnk := range conf.hashes {
reader, _ := fileStore.Retrieve(chnk)
reader, _ := fileStore.Retrieve(context.TODO(), chnk)
//assuming that reading the Size of the chunk is enough to know we found it
if s, err := reader.Size(nil); err != nil || s != chunkSize {
allSuccess = false
@ -765,9 +765,13 @@ func uploadFilesToNodes(nodes []*simulations.Node) ([]storage.Address, []string,
return nil, nil, err
}
//store it (upload it) on the FileStore
rk, wait, err := fileStore.Store(strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
ctx := context.TODO()
rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
log.Debug("Uploaded random string file to node")
wait()
if err != nil {
return nil, nil, err
}
err = wait(ctx)
if err != nil {
return nil, nil, err
}

View File

@ -581,8 +581,12 @@ func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int) ([]storage.
fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams())
var rootAddrs []storage.Address
for i := 0; i < chunkCount; i++ {
rk, wait, err := fileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
wait()
ctx := context.TODO()
rk, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
if err != nil {
return nil, err
}
err = wait(ctx)
if err != nil {
return nil, err
}

View File

@ -202,9 +202,12 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
// here we distribute chunks of a random file into stores 1...nodes
rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams())
size := chunkCount * chunkSize
_, wait, err := rrFileStore.Store(io.LimitReader(crand.Reader, int64(size)), int64(size), false)
_, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
if err != nil {
t.Fatal(err.Error())
}
// need to wait cos we then immediately collect the relevant bin content
wait()
wait(ctx)
if err != nil {
t.Fatal(err.Error())
}

View File

@ -508,14 +508,15 @@ func uploadFile(swarm *Swarm) (storage.Address, string, error) {
// File data is very short, but it is ensured that its
// uniqueness is very certain.
data := fmt.Sprintf("test content %s %x", time.Now().Round(0), b)
k, wait, err := swarm.api.Put(data, "text/plain", false)
ctx := context.TODO()
k, wait, err := swarm.api.Put(ctx, data, "text/plain", false)
if err != nil {
return nil, "", err
}
if wait != nil {
wait()
err = wait(ctx)
}
return k, data, nil
return k, data, err
}
// retrieve is the function that is used for checking the availability of
@ -570,7 +571,7 @@ func retrieve(
log.Debug("api get: check file", "node", id.String(), "key", f.addr.String(), "total files found", atomic.LoadUint64(totalFoundCount))
r, _, _, _, err := swarm.api.Get(f.addr, "/")
r, _, _, _, err := swarm.api.Get(context.TODO(), f.addr, "/")
if err != nil {
errc <- fmt.Errorf("api get: node %s, key %s, kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err)
return

View File

@ -385,7 +385,7 @@ func (ctl *HandshakeController) sendKey(pubkeyid string, topic *Topic, keycount
// generate new keys to send
for i := 0; i < len(recvkeyids); i++ {
var err error
recvkeyids[i], err = ctl.pss.generateSymmetricKey(*topic, to, true)
recvkeyids[i], err = ctl.pss.GenerateSymmetricKey(*topic, to, true)
if err != nil {
return []string{}, fmt.Errorf("set receive symkey fail (pubkey %x topic %x): %v", pubkeyid, topic, err)
}

394
swarm/pss/notify/notify.go Normal file
View File

@ -0,0 +1,394 @@
package notify
import (
"crypto/ecdsa"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/pss"
)
const (
// sent from requester to updater to request start of notifications
MsgCodeStart = iota
// sent from updater to requester, contains a notification plus a new symkey to replace the old
MsgCodeNotifyWithKey
// sent from updater to requester, contains a notification
MsgCodeNotify
// sent from requester to updater to request stop of notifications (currently unused)
MsgCodeStop
MsgCodeMax
)
const (
DefaultAddressLength = 1
symKeyLength = 32 // this should be gotten from source
)
var (
// control topic is used before symmetric key issuance completes
controlTopic = pss.Topic{0x00, 0x00, 0x00, 0x01}
)
// when code is MsgCodeStart, Payload is address
// when code is MsgCodeNotifyWithKey, Payload is notification | symkey
// when code is MsgCodeNotify, Payload is notification
// when code is MsgCodeStop, Payload is address
type Msg struct {
Code byte
Name []byte
Payload []byte
namestring string
}
// NewMsg creates a new notification message object
func NewMsg(code byte, name string, payload []byte) *Msg {
return &Msg{
Code: code,
Name: []byte(name),
Payload: payload,
namestring: name,
}
}
// NewMsgFromPayload decodes a serialized message payload into a new notification message object
func NewMsgFromPayload(payload []byte) (*Msg, error) {
msg := &Msg{}
err := rlp.DecodeBytes(payload, msg)
if err != nil {
return nil, err
}
msg.namestring = string(msg.Name)
return msg, nil
}
// a notifier has one sendBin entry for each address space it sends messages to
type sendBin struct {
address pss.PssAddress
symKeyId string
count int
}
// represents a single notification service
// only subscription address bins that match the address of a notification client have entries.
type notifier struct {
bins map[string]*sendBin
topic pss.Topic // identifies the resource for pss receiver
threshold int // amount of address bytes used in bins
updateC <-chan []byte
quitC chan struct{}
}
func (n *notifier) removeSubscription() {
n.quitC <- struct{}{}
}
// represents an individual subscription made by a public key at a specific address/neighborhood
type subscription struct {
pubkeyId string
address pss.PssAddress
handler func(string, []byte) error
}
// Controller is the interface to control, add and remove notification services and subscriptions
type Controller struct {
pss *pss.Pss
notifiers map[string]*notifier
subscriptions map[string]*subscription
mu sync.Mutex
}
// NewController creates a new Controller object
func NewController(ps *pss.Pss) *Controller {
ctrl := &Controller{
pss: ps,
notifiers: make(map[string]*notifier),
subscriptions: make(map[string]*subscription),
}
ctrl.pss.Register(&controlTopic, ctrl.Handler)
return ctrl
}
// IsActive is used to check if a notification service exists for a specified id string
// Returns true if exists, false if not
func (c *Controller) IsActive(name string) bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.isActive(name)
}
func (c *Controller) isActive(name string) bool {
_, ok := c.notifiers[name]
return ok
}
// Subscribe is used by a client to request notifications from a notification service provider
// It will create a MsgCodeStart message and send asymmetrically to the provider using its public key and routing address
// The handler function is a callback that will be called when notifications are received
// Fails if the request pss cannot be sent or if the update message could not be serialized
func (c *Controller) Subscribe(name string, pubkey *ecdsa.PublicKey, address pss.PssAddress, handler func(string, []byte) error) error {
c.mu.Lock()
defer c.mu.Unlock()
msg := NewMsg(MsgCodeStart, name, c.pss.BaseAddr())
c.pss.SetPeerPublicKey(pubkey, controlTopic, &address)
pubkeyId := hexutil.Encode(crypto.FromECDSAPub(pubkey))
smsg, err := rlp.EncodeToBytes(msg)
if err != nil {
return err
}
err = c.pss.SendAsym(pubkeyId, controlTopic, smsg)
if err != nil {
return err
}
c.subscriptions[name] = &subscription{
pubkeyId: pubkeyId,
address: address,
handler: handler,
}
return nil
}
// Unsubscribe, perhaps unsurprisingly, undoes the effects of Subscribe
// Fails if the subscription does not exist, if the request pss cannot be sent or if the update message could not be serialized
func (c *Controller) Unsubscribe(name string) error {
c.mu.Lock()
defer c.mu.Unlock()
sub, ok := c.subscriptions[name]
if !ok {
return fmt.Errorf("Unknown subscription '%s'", name)
}
msg := NewMsg(MsgCodeStop, name, sub.address)
smsg, err := rlp.EncodeToBytes(msg)
if err != nil {
return err
}
err = c.pss.SendAsym(sub.pubkeyId, controlTopic, smsg)
if err != nil {
return err
}
delete(c.subscriptions, name)
return nil
}
// NewNotifier is used by a notification service provider to create a new notification service
// It takes a name as identifier for the resource, a threshold indicating the granularity of the subscription address bin
// It then starts an event loop which listens to the supplied update channel and executes notifications on channel receives
// Fails if a notifier already is registered on the name
//func (c *Controller) NewNotifier(name string, threshold int, contentFunc func(string) ([]byte, error)) error {
func (c *Controller) NewNotifier(name string, threshold int, updateC <-chan []byte) (func(), error) {
c.mu.Lock()
if c.isActive(name) {
c.mu.Unlock()
return nil, fmt.Errorf("Notification service %s already exists in controller", name)
}
quitC := make(chan struct{})
c.notifiers[name] = &notifier{
bins: make(map[string]*sendBin),
topic: pss.BytesToTopic([]byte(name)),
threshold: threshold,
updateC: updateC,
quitC: quitC,
//contentFunc: contentFunc,
}
c.mu.Unlock()
go func() {
for {
select {
case <-quitC:
return
case data := <-updateC:
c.notify(name, data)
}
}
}()
return c.notifiers[name].removeSubscription, nil
}
// RemoveNotifier is used to stop a notification service.
// It cancels the event loop listening to the notification provider's update channel
func (c *Controller) RemoveNotifier(name string) error {
c.mu.Lock()
defer c.mu.Unlock()
currentNotifier, ok := c.notifiers[name]
if !ok {
return fmt.Errorf("Unknown notification service %s", name)
}
currentNotifier.removeSubscription()
delete(c.notifiers, name)
return nil
}
// Notify is called by a notification service provider to issue a new notification
// It takes the name of the notification service and the data to be sent.
// It fails if a notifier with this name does not exist or if data could not be serialized
// Note that it does NOT fail on failure to send a message
func (c *Controller) notify(name string, data []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.isActive(name) {
return fmt.Errorf("Notification service %s doesn't exist", name)
}
msg := NewMsg(MsgCodeNotify, name, data)
smsg, err := rlp.EncodeToBytes(msg)
if err != nil {
return err
}
for _, m := range c.notifiers[name].bins {
log.Debug("sending pss notify", "name", name, "addr", fmt.Sprintf("%x", m.address), "topic", fmt.Sprintf("%x", c.notifiers[name].topic), "data", data)
go func(m *sendBin) {
err = c.pss.SendSym(m.symKeyId, c.notifiers[name].topic, smsg)
if err != nil {
log.Warn("Failed to send notify to addr %x: %v", m.address, err)
}
}(m)
}
return nil
}
// check if we already have the bin
// if we do, retrieve the symkey from it and increment the count
// if we dont make a new symkey and a new bin entry
func (c *Controller) addToBin(ntfr *notifier, address []byte) (symKeyId string, pssAddress pss.PssAddress, err error) {
// parse the address from the message and truncate if longer than our bins threshold
if len(address) > ntfr.threshold {
address = address[:ntfr.threshold]
}
pssAddress = pss.PssAddress(address)
hexAddress := fmt.Sprintf("%x", address)
currentBin, ok := ntfr.bins[hexAddress]
if ok {
currentBin.count++
symKeyId = currentBin.symKeyId
} else {
symKeyId, err = c.pss.GenerateSymmetricKey(ntfr.topic, &pssAddress, false)
if err != nil {
return "", nil, err
}
ntfr.bins[hexAddress] = &sendBin{
address: address,
symKeyId: symKeyId,
count: 1,
}
}
return symKeyId, pssAddress, nil
}
func (c *Controller) handleStartMsg(msg *Msg, keyid string) (err error) {
keyidbytes, err := hexutil.Decode(keyid)
if err != nil {
return err
}
pubkey, err := crypto.UnmarshalPubkey(keyidbytes)
if err != nil {
return err
}
// if name is not registered for notifications we will not react
currentNotifier, ok := c.notifiers[msg.namestring]
if !ok {
return fmt.Errorf("Subscribe attempted on unknown resource '%s'", msg.namestring)
}
// add to or open new bin
symKeyId, pssAddress, err := c.addToBin(currentNotifier, msg.Payload)
if err != nil {
return err
}
// add to address book for send initial notify
symkey, err := c.pss.GetSymmetricKey(symKeyId)
if err != nil {
return err
}
err = c.pss.SetPeerPublicKey(pubkey, controlTopic, &pssAddress)
if err != nil {
return err
}
// TODO this is set to zero-length byte pending decision on protocol for initial message, whether it should include message or not, and how to trigger the initial message so that current state of MRU is sent upon subscription
notify := []byte{}
replyMsg := NewMsg(MsgCodeNotifyWithKey, msg.namestring, make([]byte, len(notify)+symKeyLength))
copy(replyMsg.Payload, notify)
copy(replyMsg.Payload[len(notify):], symkey)
sReplyMsg, err := rlp.EncodeToBytes(replyMsg)
if err != nil {
return err
}
return c.pss.SendAsym(keyid, controlTopic, sReplyMsg)
}
func (c *Controller) handleNotifyWithKeyMsg(msg *Msg) error {
symkey := msg.Payload[len(msg.Payload)-symKeyLength:]
topic := pss.BytesToTopic(msg.Name)
// \TODO keep track of and add actual address
updaterAddr := pss.PssAddress([]byte{})
c.pss.SetSymmetricKey(symkey, topic, &updaterAddr, true)
c.pss.Register(&topic, c.Handler)
return c.subscriptions[msg.namestring].handler(msg.namestring, msg.Payload[:len(msg.Payload)-symKeyLength])
}
func (c *Controller) handleStopMsg(msg *Msg) error {
// if name is not registered for notifications we will not react
currentNotifier, ok := c.notifiers[msg.namestring]
if !ok {
return fmt.Errorf("Unsubscribe attempted on unknown resource '%s'", msg.namestring)
}
// parse the address from the message and truncate if longer than our bins' address length threshold
address := msg.Payload
if len(msg.Payload) > currentNotifier.threshold {
address = address[:currentNotifier.threshold]
}
// remove the entry from the bin if it exists, and remove the bin if it's the last remaining one
hexAddress := fmt.Sprintf("%x", address)
currentBin, ok := currentNotifier.bins[hexAddress]
if !ok {
return fmt.Errorf("found no active bin for address %s", hexAddress)
}
currentBin.count--
if currentBin.count == 0 { // if no more clients in this bin, remove it
delete(currentNotifier.bins, hexAddress)
}
return nil
}
// Handler is the pss topic handler to be used to process notification service messages
// It should be registered in the pss of both to any notification service provides and clients using the service
func (c *Controller) Handler(smsg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
c.mu.Lock()
defer c.mu.Unlock()
log.Debug("notify controller handler", "keyid", keyid)
// see if the message is valid
msg, err := NewMsgFromPayload(smsg)
if err != nil {
return err
}
switch msg.Code {
case MsgCodeStart:
return c.handleStartMsg(msg, keyid)
case MsgCodeNotifyWithKey:
return c.handleNotifyWithKeyMsg(msg)
case MsgCodeNotify:
return c.subscriptions[msg.namestring].handler(msg.namestring, msg.Payload)
case MsgCodeStop:
return c.handleStopMsg(msg)
}
return fmt.Errorf("Invalid message code: %d", msg.Code)
}

View File

@ -0,0 +1,252 @@
package notify
import (
"bytes"
"context"
"flag"
"fmt"
"os"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/pss"
"github.com/ethereum/go-ethereum/swarm/state"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
)
var (
loglevel = flag.Int("l", 3, "loglevel")
psses map[string]*pss.Pss
w *whisper.Whisper
wapi *whisper.PublicWhisperAPI
)
func init() {
flag.Parse()
hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
hf := log.LvlFilterHandler(log.Lvl(*loglevel), hs)
h := log.CallerFileHandler(hf)
log.Root().SetHandler(h)
w = whisper.New(&whisper.DefaultConfig)
wapi = whisper.NewPublicWhisperAPI(w)
psses = make(map[string]*pss.Pss)
}
// Creates a client node and notifier node
// Client sends pss notifications requests
// notifier sends initial notification with symmetric key, and
// second notification symmetrically encrypted
func TestStart(t *testing.T) {
adapter := adapters.NewSimAdapter(newServices(false))
net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{
ID: "0",
DefaultService: "bzz",
})
leftNodeConf := adapters.RandomNodeConfig()
leftNodeConf.Services = []string{"bzz", "pss"}
leftNode, err := net.NewNodeWithConfig(leftNodeConf)
if err != nil {
t.Fatal(err)
}
err = net.Start(leftNode.ID())
if err != nil {
t.Fatal(err)
}
rightNodeConf := adapters.RandomNodeConfig()
rightNodeConf.Services = []string{"bzz", "pss"}
rightNode, err := net.NewNodeWithConfig(rightNodeConf)
if err != nil {
t.Fatal(err)
}
err = net.Start(rightNode.ID())
if err != nil {
t.Fatal(err)
}
err = net.Connect(rightNode.ID(), leftNode.ID())
if err != nil {
t.Fatal(err)
}
leftRpc, err := leftNode.Client()
if err != nil {
t.Fatal(err)
}
rightRpc, err := rightNode.Client()
if err != nil {
t.Fatal(err)
}
var leftAddr string
err = leftRpc.Call(&leftAddr, "pss_baseAddr")
if err != nil {
t.Fatal(err)
}
var rightAddr string
err = rightRpc.Call(&rightAddr, "pss_baseAddr")
if err != nil {
t.Fatal(err)
}
var leftPub string
err = leftRpc.Call(&leftPub, "pss_getPublicKey")
if err != nil {
t.Fatal(err)
}
var rightPub string
err = rightRpc.Call(&rightPub, "pss_getPublicKey")
if err != nil {
t.Fatal(err)
}
rsrcName := "foo.eth"
rsrcTopic := pss.BytesToTopic([]byte(rsrcName))
// wait for kademlia table to populate
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
rmsgC := make(chan *pss.APIMsg)
rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic)
if err != nil {
t.Fatal(err)
}
defer rightSub.Unsubscribe()
updateC := make(chan []byte)
updateMsg := []byte{}
ctrlClient := NewController(psses[rightPub])
ctrlNotifier := NewController(psses[leftPub])
ctrlNotifier.NewNotifier("foo.eth", 2, updateC)
pubkeybytes, err := hexutil.Decode(leftPub)
if err != nil {
t.Fatal(err)
}
pubkey, err := crypto.UnmarshalPubkey(pubkeybytes)
if err != nil {
t.Fatal(err)
}
addrbytes, err := hexutil.Decode(leftAddr)
if err != nil {
t.Fatal(err)
}
ctrlClient.Subscribe(rsrcName, pubkey, addrbytes, func(s string, b []byte) error {
if s != "foo.eth" || !bytes.Equal(updateMsg, b) {
t.Fatalf("unexpected result in client handler: '%s':'%x'", s, b)
}
log.Info("client handler receive", "s", s, "b", b)
return nil
})
var inMsg *pss.APIMsg
select {
case inMsg = <-rmsgC:
case <-ctx.Done():
t.Fatal(ctx.Err())
}
dMsg, err := NewMsgFromPayload(inMsg.Msg)
if err != nil {
t.Fatal(err)
}
if dMsg.namestring != rsrcName {
t.Fatalf("expected name '%s', got '%s'", rsrcName, dMsg.namestring)
}
if !bytes.Equal(dMsg.Payload[:len(updateMsg)], updateMsg) {
t.Fatalf("expected payload first %d bytes '%x', got '%x'", len(updateMsg), updateMsg, dMsg.Payload[:len(updateMsg)])
}
if len(updateMsg)+symKeyLength != len(dMsg.Payload) {
t.Fatalf("expected payload length %d, have %d", len(updateMsg)+symKeyLength, len(dMsg.Payload))
}
rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic)
if err != nil {
t.Fatal(err)
}
defer rightSubUpdate.Unsubscribe()
updateMsg = []byte("plugh")
updateC <- updateMsg
select {
case inMsg = <-rmsgC:
case <-ctx.Done():
log.Error("timed out waiting for msg", "topic", fmt.Sprintf("%x", rsrcTopic))
t.Fatal(ctx.Err())
}
dMsg, err = NewMsgFromPayload(inMsg.Msg)
if err != nil {
t.Fatal(err)
}
if dMsg.namestring != rsrcName {
t.Fatalf("expected name %s, got %s", rsrcName, dMsg.namestring)
}
if !bytes.Equal(dMsg.Payload, updateMsg) {
t.Fatalf("expected payload '%x', got '%x'", updateMsg, dMsg.Payload)
}
}
func newServices(allowRaw bool) adapters.Services {
stateStore := state.NewInmemoryStore()
kademlias := make(map[discover.NodeID]*network.Kademlia)
kademlia := func(id discover.NodeID) *network.Kademlia {
if k, ok := kademlias[id]; ok {
return k
}
addr := network.NewAddrFromNodeID(id)
params := network.NewKadParams()
params.MinProxBinSize = 2
params.MaxBinSize = 3
params.MinBinSize = 1
params.MaxRetries = 1000
params.RetryExponent = 2
params.RetryInterval = 1000000
kademlias[id] = network.NewKademlia(addr.Over(), params)
return kademlias[id]
}
return adapters.Services{
"pss": func(ctx *adapters.ServiceContext) (node.Service, error) {
ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
keys, err := wapi.NewKeyPair(ctxlocal)
privkey, err := w.GetPrivateKey(keys)
pssp := pss.NewPssParams().WithPrivateKey(privkey)
pssp.MsgTTL = time.Second * 30
pssp.AllowRaw = allowRaw
pskad := kademlia(ctx.Config.ID)
ps, err := pss.NewPss(pskad, pssp)
if err != nil {
return nil, err
}
//psses[common.ToHex(crypto.FromECDSAPub(&privkey.PublicKey))] = ps
psses[hexutil.Encode(crypto.FromECDSAPub(&privkey.PublicKey))] = ps
return ps, nil
},
"bzz": func(ctx *adapters.ServiceContext) (node.Service, error) {
addr := network.NewAddrFromNodeID(ctx.Config.ID)
hp := network.NewHiveParams()
hp.Discovery = false
config := &network.BzzConfig{
OverlayAddr: addr.Over(),
UnderlayAddr: addr.Under(),
HiveParams: hp,
}
return network.NewBzz(config, kademlia(ctx.Config.ID), stateStore, nil, nil), nil
},
}
}

View File

@ -172,6 +172,8 @@ func (p *Protocol) Handle(msg []byte, peer *p2p.Peer, asymmetric bool, keyid str
rw, err := p.AddPeer(peer, *p.topic, asymmetric, keyid)
if err != nil {
return err
} else if rw == nil {
return fmt.Errorf("handle called on nil MsgReadWriter for new key " + keyid)
}
vrw = rw.(*PssReadWriter)
}
@ -181,8 +183,14 @@ func (p *Protocol) Handle(msg []byte, peer *p2p.Peer, asymmetric bool, keyid str
return fmt.Errorf("could not decode pssmsg")
}
if asymmetric {
if p.pubKeyRWPool[keyid] == nil {
return fmt.Errorf("handle called on nil MsgReadWriter for key " + keyid)
}
vrw = p.pubKeyRWPool[keyid].(*PssReadWriter)
} else {
if p.symKeyRWPool[keyid] == nil {
return fmt.Errorf("handle called on nil MsgReadWriter for key " + keyid)
}
vrw = p.symKeyRWPool[keyid].(*PssReadWriter)
}
vrw.injectMsg(pmsg)

View File

@ -41,7 +41,7 @@ import (
const (
defaultPaddingByteSize = 16
defaultMsgTTL = time.Second * 120
DefaultMsgTTL = time.Second * 120
defaultDigestCacheTTL = time.Second * 10
defaultSymKeyCacheCapacity = 512
digestLength = 32 // byte length of digest used for pss cache (currently same as swarm chunk hash)
@ -94,7 +94,7 @@ type PssParams struct {
// Sane defaults for Pss
func NewPssParams() *PssParams {
return &PssParams{
MsgTTL: defaultMsgTTL,
MsgTTL: DefaultMsgTTL,
CacheTTL: defaultDigestCacheTTL,
SymKeyCacheCapacity: defaultSymKeyCacheCapacity,
}
@ -354,11 +354,11 @@ func (p *Pss) handlePssMsg(msg interface{}) error {
}
if int64(pssmsg.Expire) < time.Now().Unix() {
metrics.GetOrRegisterCounter("pss.expire", nil).Inc(1)
log.Warn("pss filtered expired message", "from", fmt.Sprintf("%x", p.Overlay.BaseAddr()), "to", fmt.Sprintf("%x", common.ToHex(pssmsg.To)))
log.Warn("pss filtered expired message", "from", common.ToHex(p.Overlay.BaseAddr()), "to", common.ToHex(pssmsg.To))
return nil
}
if p.checkFwdCache(pssmsg) {
log.Trace(fmt.Sprintf("pss relay block-cache match (process): FROM %x TO %x", p.Overlay.BaseAddr(), common.ToHex(pssmsg.To)))
log.Trace("pss relay block-cache match (process)", "from", common.ToHex(p.Overlay.BaseAddr()), "to", (common.ToHex(pssmsg.To)))
return nil
}
p.addFwdCache(pssmsg)
@ -480,7 +480,7 @@ func (p *Pss) SetPeerPublicKey(pubkey *ecdsa.PublicKey, topic Topic, address *Ps
}
// Automatically generate a new symkey for a topic and address hint
func (p *Pss) generateSymmetricKey(topic Topic, address *PssAddress, addToCache bool) (string, error) {
func (p *Pss) GenerateSymmetricKey(topic Topic, address *PssAddress, addToCache bool) (string, error) {
keyid, err := p.w.GenerateSymKey()
if err != nil {
return "", err

View File

@ -470,7 +470,7 @@ func TestKeys(t *testing.T) {
}
// make a symmetric key that we will send to peer for encrypting messages to us
inkeyid, err := ps.generateSymmetricKey(topicobj, &addr, true)
inkeyid, err := ps.GenerateSymmetricKey(topicobj, &addr, true)
if err != nil {
t.Fatalf("failed to set 'our' incoming symmetric key")
}
@ -1296,7 +1296,7 @@ func benchmarkSymKeySend(b *testing.B) {
topic := BytesToTopic([]byte("foo"))
to := make(PssAddress, 32)
copy(to[:], network.RandomAddr().Over())
symkeyid, err := ps.generateSymmetricKey(topic, &to, true)
symkeyid, err := ps.GenerateSymmetricKey(topic, &to, true)
if err != nil {
b.Fatalf("could not generate symkey: %v", err)
}
@ -1389,7 +1389,7 @@ func benchmarkSymkeyBruteforceChangeaddr(b *testing.B) {
for i := 0; i < int(keycount); i++ {
to := make(PssAddress, 32)
copy(to[:], network.RandomAddr().Over())
keyid, err = ps.generateSymmetricKey(topic, &to, true)
keyid, err = ps.GenerateSymmetricKey(topic, &to, true)
if err != nil {
b.Fatalf("cant generate symkey #%d: %v", i, err)
}
@ -1471,7 +1471,7 @@ func benchmarkSymkeyBruteforceSameaddr(b *testing.B) {
topic := BytesToTopic([]byte("foo"))
for i := 0; i < int(keycount); i++ {
copy(addr[i], network.RandomAddr().Over())
keyid, err = ps.generateSymmetricKey(topic, &addr[i], true)
keyid, err = ps.GenerateSymmetricKey(topic, &addr[i], true)
if err != nil {
b.Fatalf("cant generate symkey #%d: %v", i, err)
}

View File

@ -16,6 +16,7 @@
package storage
import (
"context"
"encoding/binary"
"errors"
"fmt"
@ -126,7 +127,7 @@ type TreeChunker struct {
The chunks are not meant to be validated by the chunker when joining. This
is because it is left to the DPA to decide which sources are trusted.
*/
func TreeJoin(addr Address, getter Getter, depth int) *LazyChunkReader {
func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *LazyChunkReader {
jp := &JoinerParams{
ChunkerParams: ChunkerParams{
chunkSize: DefaultChunkSize,
@ -137,14 +138,14 @@ func TreeJoin(addr Address, getter Getter, depth int) *LazyChunkReader {
depth: depth,
}
return NewTreeJoiner(jp).Join()
return NewTreeJoiner(jp).Join(ctx)
}
/*
When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
New chunks to store are store using the putter which the caller provides.
*/
func TreeSplit(data io.Reader, size int64, putter Putter) (k Address, wait func(), err error) {
func TreeSplit(ctx context.Context, data io.Reader, size int64, putter Putter) (k Address, wait func(context.Context) error, err error) {
tsp := &TreeSplitterParams{
SplitterParams: SplitterParams{
ChunkerParams: ChunkerParams{
@ -156,7 +157,7 @@ func TreeSplit(data io.Reader, size int64, putter Putter) (k Address, wait func(
},
size: size,
}
return NewTreeSplitter(tsp).Split()
return NewTreeSplitter(tsp).Split(ctx)
}
func NewTreeJoiner(params *JoinerParams) *TreeChunker {
@ -224,7 +225,7 @@ func (tc *TreeChunker) decrementWorkerCount() {
tc.workerCount -= 1
}
func (tc *TreeChunker) Split() (k Address, wait func(), err error) {
func (tc *TreeChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
if tc.chunkSize <= 0 {
panic("chunker must be initialised")
}
@ -380,7 +381,7 @@ type LazyChunkReader struct {
getter Getter
}
func (tc *TreeChunker) Join() *LazyChunkReader {
func (tc *TreeChunker) Join(ctx context.Context) *LazyChunkReader {
return &LazyChunkReader{
key: tc.addr,
chunkSize: tc.chunkSize,

View File

@ -18,6 +18,7 @@ package storage
import (
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"errors"
@ -81,7 +82,7 @@ func testRandomBrokenData(n int, tester *chunkerTester) {
putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash)
expectedError := fmt.Errorf("Broken reader")
addr, _, err := TreeSplit(brokendata, int64(n), putGetter)
addr, _, err := TreeSplit(context.TODO(), brokendata, int64(n), putGetter)
if err == nil || err.Error() != expectedError.Error() {
tester.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err)
}
@ -104,20 +105,24 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester)
putGetter := newTestHasherStore(NewMapChunkStore(), hash)
var addr Address
var wait func()
var wait func(context.Context) error
var err error
ctx := context.TODO()
if usePyramid {
addr, wait, err = PyramidSplit(data, putGetter, putGetter)
addr, wait, err = PyramidSplit(ctx, data, putGetter, putGetter)
} else {
addr, wait, err = TreeSplit(data, int64(n), putGetter)
addr, wait, err = TreeSplit(ctx, data, int64(n), putGetter)
}
if err != nil {
tester.t.Fatalf(err.Error())
}
tester.t.Logf(" Key = %v\n", addr)
wait()
err = wait(ctx)
if err != nil {
tester.t.Fatalf(err.Error())
}
reader := TreeJoin(addr, putGetter, 0)
reader := TreeJoin(context.TODO(), addr, putGetter, 0)
output := make([]byte, n)
r, err := reader.Read(output)
if r != n || err != io.EOF {
@ -200,11 +205,15 @@ func TestDataAppend(t *testing.T) {
chunkStore := NewMapChunkStore()
putGetter := newTestHasherStore(chunkStore, SHA3Hash)
addr, wait, err := PyramidSplit(data, putGetter, putGetter)
ctx := context.TODO()
addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
if err != nil {
tester.t.Fatalf(err.Error())
}
err = wait(ctx)
if err != nil {
tester.t.Fatalf(err.Error())
}
wait()
//create a append data stream
appendInput, found := tester.inputs[uint64(m)]
@ -217,13 +226,16 @@ func TestDataAppend(t *testing.T) {
}
putGetter = newTestHasherStore(chunkStore, SHA3Hash)
newAddr, wait, err := PyramidAppend(addr, appendData, putGetter, putGetter)
newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter)
if err != nil {
tester.t.Fatalf(err.Error())
}
err = wait(ctx)
if err != nil {
tester.t.Fatalf(err.Error())
}
wait()
reader := TreeJoin(newAddr, putGetter, 0)
reader := TreeJoin(ctx, newAddr, putGetter, 0)
newOutput := make([]byte, n+m)
r, err := reader.Read(newOutput)
if r != (n + m) {
@ -282,12 +294,16 @@ func benchmarkSplitJoin(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash)
key, wait, err := PyramidSplit(data, putGetter, putGetter)
ctx := context.TODO()
key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
wait()
reader := TreeJoin(key, putGetter, 0)
err = wait(ctx)
if err != nil {
t.Fatalf(err.Error())
}
reader := TreeJoin(ctx, key, putGetter, 0)
benchReadAll(reader)
}
}
@ -298,7 +314,7 @@ func benchmarkSplitTreeSHA3(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash)
_, _, err := TreeSplit(data, int64(n), putGetter)
_, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter)
if err != nil {
t.Fatalf(err.Error())
}
@ -311,7 +327,7 @@ func benchmarkSplitTreeBMT(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash)
_, _, err := TreeSplit(data, int64(n), putGetter)
_, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter)
if err != nil {
t.Fatalf(err.Error())
}
@ -324,7 +340,7 @@ func benchmarkSplitPyramidSHA3(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash)
_, _, err := PyramidSplit(data, putGetter, putGetter)
_, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
@ -338,7 +354,7 @@ func benchmarkSplitPyramidBMT(n int, t *testing.B) {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash)
_, _, err := PyramidSplit(data, putGetter, putGetter)
_, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
@ -354,18 +370,25 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) {
chunkStore := NewMapChunkStore()
putGetter := newTestHasherStore(chunkStore, SHA3Hash)
key, wait, err := PyramidSplit(data, putGetter, putGetter)
ctx := context.TODO()
key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
err = wait(ctx)
if err != nil {
t.Fatalf(err.Error())
}
wait()
putGetter = newTestHasherStore(chunkStore, SHA3Hash)
_, wait, err = PyramidAppend(key, data1, putGetter, putGetter)
_, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
err = wait(ctx)
if err != nil {
t.Fatalf(err.Error())
}
wait()
}
}

View File

@ -17,6 +17,7 @@
package storage
import (
"context"
"io"
)
@ -78,18 +79,18 @@ func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore {
// Chunk retrieval blocks on netStore requests with a timeout so reader will
// report error if retrieval of chunks within requested range time out.
// It returns a reader with the chunk data and whether the content was encrypted
func (f *FileStore) Retrieve(addr Address) (reader *LazyChunkReader, isEncrypted bool) {
func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChunkReader, isEncrypted bool) {
isEncrypted = len(addr) > f.hashFunc().Size()
getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted)
reader = TreeJoin(addr, getter, 0)
reader = TreeJoin(ctx, addr, getter, 0)
return
}
// Public API. Main entry point for document storage directly. Used by the
// FS-aware API and httpaccess
func (f *FileStore) Store(data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(), err error) {
func (f *FileStore) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(context.Context) error, err error) {
putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt)
return PyramidSplit(data, putter, putter)
return PyramidSplit(ctx, data, putter, putter)
}
func (f *FileStore) HashSize() int {

View File

@ -18,6 +18,7 @@ package storage
import (
"bytes"
"context"
"io"
"io/ioutil"
"os"
@ -49,12 +50,16 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
defer os.RemoveAll("/tmp/bzz")
reader, slice := generateRandomData(testDataSize)
key, wait, err := fileStore.Store(reader, testDataSize, toEncrypt)
ctx := context.TODO()
key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt)
if err != nil {
t.Errorf("Store error: %v", err)
}
wait()
resultReader, isEncrypted := fileStore.Retrieve(key)
err = wait(ctx)
if err != nil {
t.Fatalf("Store waitt error: %v", err.Error())
}
resultReader, isEncrypted := fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
@ -72,7 +77,7 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
ioutil.WriteFile("/tmp/slice.bzz.16M", slice, 0666)
ioutil.WriteFile("/tmp/result.bzz.16M", resultSlice, 0666)
localStore.memStore = NewMemStore(NewDefaultStoreParams(), db)
resultReader, isEncrypted = fileStore.Retrieve(key)
resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
@ -110,12 +115,16 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
}
fileStore := NewFileStore(localStore, NewFileStoreParams())
reader, slice := generateRandomData(testDataSize)
key, wait, err := fileStore.Store(reader, testDataSize, toEncrypt)
ctx := context.TODO()
key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt)
if err != nil {
t.Errorf("Store error: %v", err)
}
wait()
resultReader, isEncrypted := fileStore.Retrieve(key)
err = wait(ctx)
if err != nil {
t.Errorf("Store error: %v", err)
}
resultReader, isEncrypted := fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
@ -134,7 +143,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
memStore.setCapacity(0)
// check whether it is, indeed, empty
fileStore.ChunkStore = memStore
resultReader, isEncrypted = fileStore.Retrieve(key)
resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
@ -144,7 +153,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
// check how it works with localStore
fileStore.ChunkStore = localStore
// localStore.dbStore.setCapacity(0)
resultReader, isEncrypted = fileStore.Retrieve(key)
resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}

View File

@ -17,6 +17,7 @@
package storage
import (
"context"
"fmt"
"sync"
@ -126,9 +127,10 @@ func (h *hasherStore) Close() {
// Wait returns when
// 1) the Close() function has been called and
// 2) all the chunks which has been Put has been stored
func (h *hasherStore) Wait() {
func (h *hasherStore) Wait(ctx context.Context) error {
<-h.closed
h.wg.Wait()
return nil
}
func (h *hasherStore) createHash(chunkData ChunkData) Address {

View File

@ -18,6 +18,7 @@ package storage
import (
"bytes"
"context"
"testing"
"github.com/ethereum/go-ethereum/swarm/storage/encryption"
@ -60,7 +61,10 @@ func TestHasherStore(t *testing.T) {
hasherStore.Close()
// Wait until chunks are really stored
hasherStore.Wait()
err = hasherStore.Wait(context.TODO())
if err != nil {
t.Fatalf("Expected no error got \"%v\"", err)
}
// Get the first chunk
retrievedChunkData1, err := hasherStore.Get(key1)

View File

@ -59,12 +59,12 @@ func newTestDbStore(mock bool, trusted bool) (*testDbStore, func(), error) {
}
cleanup := func() {
if err != nil {
if db != nil {
db.Close()
}
err = os.RemoveAll(dir)
if err != nil {
panic("db cleanup failed")
panic(fmt.Sprintf("db cleanup failed: %v", err))
}
}

View File

@ -17,6 +17,7 @@
package storage
import (
"context"
"encoding/binary"
"errors"
"io"
@ -99,12 +100,12 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get
When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
New chunks to store are store using the putter which the caller provides.
*/
func PyramidSplit(reader io.Reader, putter Putter, getter Getter) (Address, func(), error) {
return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, DefaultChunkSize)).Split()
func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, DefaultChunkSize)).Split(ctx)
}
func PyramidAppend(addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(), error) {
return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, DefaultChunkSize)).Append()
func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) {
return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, DefaultChunkSize)).Append(ctx)
}
// Entry to create a tree node
@ -203,7 +204,7 @@ func (pc *PyramidChunker) decrementWorkerCount() {
pc.workerCount -= 1
}
func (pc *PyramidChunker) Split() (k Address, wait func(), err error) {
func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
log.Debug("pyramid.chunker: Split()")
pc.wg.Add(1)
@ -235,7 +236,7 @@ func (pc *PyramidChunker) Split() (k Address, wait func(), err error) {
}
func (pc *PyramidChunker) Append() (k Address, wait func(), err error) {
func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
log.Debug("pyramid.chunker: Append()")
// Load the right most unfinished tree chunks in every level
pc.loadTree()

View File

@ -18,6 +18,7 @@ package storage
import (
"bytes"
"context"
"crypto"
"crypto/rand"
"encoding/binary"
@ -303,7 +304,7 @@ type Putter interface {
// Close is to indicate that no more chunk data will be Put on this Putter
Close()
// Wait returns if all data has been store and the Close() was called.
Wait()
Wait(context.Context) error
}
// Getter is an interface to retrieve a chunk's data by its reference

View File

@ -17,10 +17,13 @@
package swarm
import (
"context"
"encoding/hex"
"io/ioutil"
"math/rand"
"os"
"path"
"runtime"
"strings"
"testing"
"time"
@ -42,6 +45,13 @@ func TestNewSwarm(t *testing.T) {
// a simple rpc endpoint for testing dialing
ipcEndpoint := path.Join(dir, "TestSwarm.ipc")
// windows namedpipes are not on filesystem but on NPFS
if runtime.GOOS == "windows" {
b := make([]byte, 8)
rand.Read(b)
ipcEndpoint = `\\.\pipe\TestSwarm-` + hex.EncodeToString(b)
}
_, server, err := rpc.StartIPCEndpoint(ipcEndpoint, nil)
if err != nil {
t.Error(err)
@ -338,15 +348,19 @@ func testLocalStoreAndRetrieve(t *testing.T, swarm *Swarm, n int, randomData boo
}
dataPut := string(slice)
k, wait, err := swarm.api.Store(strings.NewReader(dataPut), int64(len(dataPut)), false)
ctx := context.TODO()
k, wait, err := swarm.api.Store(ctx, strings.NewReader(dataPut), int64(len(dataPut)), false)
if err != nil {
t.Fatal(err)
}
if wait != nil {
wait()
err = wait(ctx)
if err != nil {
t.Fatal(err)
}
}
r, _ := swarm.api.Retrieve(k)
r, _ := swarm.api.Retrieve(context.TODO(), k)
d, err := ioutil.ReadAll(r)
if err != nil {