From ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1 Mon Sep 17 00:00:00 2001 From: Elad Date: Sun, 5 May 2019 22:34:22 +0400 Subject: [PATCH] swarm: push tags integration - request flow swarm/api: integrate tags to count chunks being split and stored swarm/api/http: integrate tags in middleware for HTTP `POST` calls and assert chunks being calculated and counted correctly swarm: remove deprecated and unused code, add swarm hash to DoneSplit signature, remove calls to the api client from the http package --- cmd/swarm/explore.go | 3 +- cmd/swarm/hash.go | 3 +- cmd/swarm/swarm-smoke/upload_and_sync.go | 2 +- swarm/api/api.go | 32 +---- swarm/api/api_test.go | 108 ++++++++++++--- swarm/api/client/client.go | 32 +++++ swarm/api/client/client_test.go | 21 ++- swarm/api/filesystem_test.go | 3 +- swarm/api/http/middleware.go | 49 +++++++ swarm/api/http/response.go | 2 +- swarm/api/http/server.go | 60 +++++++- swarm/api/http/server_test.go | 152 +++++++++++++++++---- swarm/api/http/test_server.go | 9 +- swarm/api/manifest_test.go | 7 +- swarm/api/storage.go | 85 ------------ swarm/api/storage_test.go | 56 -------- swarm/chunk/tag.go | 122 ++++++++--------- swarm/chunk/tag_test.go | 50 +++---- swarm/chunk/tags.go | 24 +++- swarm/chunk/tags_test.go | 48 +++++++ swarm/fuse/swarmfs_test.go | 5 +- swarm/network/stream/common_test.go | 2 +- swarm/network/stream/delivery_test.go | 2 +- swarm/network/stream/snapshot_sync_test.go | 2 +- swarm/network_test.go | 32 ++++- swarm/storage/chunker_test.go | 21 +-- swarm/storage/filestore.go | 33 +++-- swarm/storage/filestore_test.go | 7 +- swarm/storage/hasherstore.go | 10 +- swarm/storage/hasherstore_test.go | 2 +- swarm/storage/pyramid.go | 13 +- swarm/swarm.go | 5 +- swarm/swarm_test.go | 8 +- swarm/testutil/tag.go | 51 +++++++ 34 files changed, 699 insertions(+), 362 deletions(-) delete mode 100644 swarm/api/storage.go delete mode 100644 swarm/api/storage_test.go create mode 100644 swarm/chunk/tags_test.go create mode 100644 swarm/testutil/tag.go diff --git a/cmd/swarm/explore.go b/cmd/swarm/explore.go index 5b5b8bf41..9566213e4 100644 --- a/cmd/swarm/explore.go +++ b/cmd/swarm/explore.go @@ -23,6 +23,7 @@ import ( "os" "github.com/ethereum/go-ethereum/cmd/utils" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" "gopkg.in/urfave/cli.v1" ) @@ -47,7 +48,7 @@ func hashes(ctx *cli.Context) { } defer f.Close() - fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags()) refs, err := fileStore.GetAllReferences(context.TODO(), f, false) if err != nil { utils.Fatalf("%v\n", err) diff --git a/cmd/swarm/hash.go b/cmd/swarm/hash.go index 2df02c0ed..ff786fa10 100644 --- a/cmd/swarm/hash.go +++ b/cmd/swarm/hash.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/contracts/ens" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" "gopkg.in/urfave/cli.v1" ) @@ -77,7 +78,7 @@ func hash(ctx *cli.Context) { defer f.Close() stat, _ := f.Stat() - fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags()) addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false) if err != nil { utils.Fatalf("%v\n", err) diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index bbcf66b26..d6eb87ace 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -255,7 +255,7 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) { return nil, fmt.Errorf("unable to create temp dir: %v", err) } defer os.RemoveAll(datadir) - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags()) if err != nil { return nil, err } diff --git a/swarm/api/api.go b/swarm/api/api.go index 86c111923..96fb86e1c 100644 --- a/swarm/api/api.go +++ b/swarm/api/api.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/contracts/ens" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" @@ -53,8 +54,6 @@ import ( var ( apiResolveCount = metrics.NewRegisteredCounter("api.resolve.count", nil) apiResolveFail = metrics.NewRegisteredCounter("api.resolve.fail", nil) - apiPutCount = metrics.NewRegisteredCounter("api.put.count", nil) - apiPutFail = metrics.NewRegisteredCounter("api.put.fail", nil) apiGetCount = metrics.NewRegisteredCounter("api.get.count", nil) apiGetNotFound = metrics.NewRegisteredCounter("api.get.notfound", nil) apiGetHTTP300 = metrics.NewRegisteredCounter("api.get.http.300", nil) @@ -188,15 +187,17 @@ type API struct { feed *feed.Handler fileStore *storage.FileStore dns Resolver + Tags *chunk.Tags Decryptor func(context.Context, string) DecryptFunc } // NewAPI the api constructor initialises a new API instance. -func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey) (self *API) { +func NewAPI(fileStore *storage.FileStore, dns Resolver, feedHandler *feed.Handler, pk *ecdsa.PrivateKey, tags *chunk.Tags) (self *API) { self = &API{ fileStore: fileStore, dns: dns, feed: feedHandler, + Tags: tags, Decryptor: func(ctx context.Context, credentials string) DecryptFunc { return self.doDecrypt(ctx, credentials, pk) }, @@ -297,31 +298,6 @@ func (a *API) ResolveURI(ctx context.Context, uri *URI, credentials string) (sto return addr, nil } -// Put provides singleton manifest creation on top of FileStore store -func (a *API) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { - apiPutCount.Inc(1) - r := strings.NewReader(content) - key, waitContent, err := a.fileStore.Store(ctx, r, int64(len(content)), toEncrypt) - if err != nil { - apiPutFail.Inc(1) - return nil, nil, err - } - manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) - r = strings.NewReader(manifest) - key, waitManifest, err := a.fileStore.Store(ctx, r, int64(len(manifest)), toEncrypt) - if err != nil { - apiPutFail.Inc(1) - return nil, nil, err - } - return key, func(ctx context.Context) error { - err := waitContent(ctx) - if err != nil { - return err - } - return waitManifest(ctx) - }, nil -} - // Get uses iterative manifest retrieval and prefix matching // to resolve basePath to content using FileStore retrieve // it returns a section reader, mimeType, status, the key of the actual content and an error diff --git a/swarm/api/api_test.go b/swarm/api/api_test.go index eb896f32a..4a5f92362 100644 --- a/swarm/api/api_test.go +++ b/swarm/api/api_test.go @@ -19,6 +19,7 @@ package api import ( "bytes" "context" + crand "crypto/rand" "errors" "flag" "fmt" @@ -26,13 +27,16 @@ import ( "io/ioutil" "math/big" "os" + "strings" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/testutil" ) func init() { @@ -41,19 +45,21 @@ func init() { log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(true))))) } -func testAPI(t *testing.T, f func(*API, bool)) { - datadir, err := ioutil.TempDir("", "bzz-test") - if err != nil { - t.Fatalf("unable to create temp dir: %v", err) +func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) { + for _, v := range []bool{true, false} { + datadir, err := ioutil.TempDir("", "bzz-test") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(datadir) + tags := chunk.NewTags() + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags) + if err != nil { + return + } + api := NewAPI(fileStore, nil, nil, nil, tags) + f(api, tags, v) } - defer os.RemoveAll(datadir) - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) - if err != nil { - return - } - api := NewAPI(fileStore, nil, nil, nil) - f(api, false) - f(api, true) } type testResponse struct { @@ -61,6 +67,13 @@ type testResponse struct { *Response } +type Response struct { + MimeType string + Status int + Size int64 + Content string +} + func checkResponse(t *testing.T, resp *testResponse, exp *Response) { if resp.MimeType != exp.MimeType { @@ -111,15 +124,14 @@ func testGet(t *testing.T, api *API, bzzhash, path string) *testResponse { } reader.Seek(0, 0) return &testResponse{reader, &Response{mimeType, status, size, string(s)}} - // return &testResponse{reader, &Response{mimeType, status, reader.Size(), nil}} } func TestApiPut(t *testing.T) { - testAPI(t, func(api *API, toEncrypt bool) { + testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) { content := "hello" exp := expResponse(content, "text/plain", 0) ctx := context.TODO() - addr, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt) + addr, wait, err := putString(ctx, api, content, exp.MimeType, toEncrypt) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -129,6 +141,40 @@ func TestApiPut(t *testing.T) { } resp := testGet(t, api, addr.Hex(), "") checkResponse(t, resp, exp) + tag := tags.All()[0] + testutil.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest + }) +} + +// TestApiTagLarge tests that the the number of chunks counted is larger for a larger input +func TestApiTagLarge(t *testing.T) { + const contentLength = 4096 * 4095 + testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) { + randomContentReader := io.LimitReader(crand.Reader, int64(contentLength)) + tag, err := api.Tags.New("unnamed-tag", 0) + if err != nil { + t.Fatal(err) + } + ctx := sctx.SetTag(context.Background(), tag.Uid) + key, waitContent, err := api.Store(ctx, randomContentReader, int64(contentLength), toEncrypt) + if err != nil { + t.Fatal(err) + } + err = waitContent(ctx) + if err != nil { + t.Fatal(err) + } + tag.DoneSplit(key) + + if toEncrypt { + tag := tags.All()[0] + expect := int64(4095 + 64 + 1) + testutil.CheckTag(t, tag, expect, expect, 0, expect) + } else { + tag := tags.All()[0] + expect := int64(4095 + 32 + 1) + testutil.CheckTag(t, tag, expect, expect, 0, expect) + } }) } @@ -391,7 +437,7 @@ func TestDecryptOriginForbidden(t *testing.T) { Access: &AccessEntry{Type: AccessTypePass}, } - api := NewAPI(nil, nil, nil, nil) + api := NewAPI(nil, nil, nil, nil, chunk.NewTags()) f := api.Decryptor(ctx, "") err := f(me) @@ -425,7 +471,7 @@ func TestDecryptOrigin(t *testing.T) { Access: &AccessEntry{Type: AccessTypePass}, } - api := NewAPI(nil, nil, nil, nil) + api := NewAPI(nil, nil, nil, nil, chunk.NewTags()) f := api.Decryptor(ctx, "") err := f(me) @@ -500,3 +546,31 @@ func TestDetectContentType(t *testing.T) { }) } } + +// putString provides singleton manifest creation on top of api.API +func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { + r := strings.NewReader(content) + tag, err := a.Tags.New("unnamed-tag", 0) + + log.Trace("created new tag", "uid", tag.Uid) + + cCtx := sctx.SetTag(ctx, tag.Uid) + key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt) + if err != nil { + return nil, nil, err + } + manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) + r = strings.NewReader(manifest) + key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt) + if err != nil { + return nil, nil, err + } + tag.DoneSplit(key) + return key, func(ctx context.Context) error { + err := waitContent(ctx) + if err != nil { + return err + } + return waitManifest(ctx) + }, nil +} diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go index 5e293cca7..9ad0948f4 100644 --- a/swarm/api/client/client.go +++ b/swarm/api/client/client.go @@ -40,6 +40,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/pborman/uuid" @@ -75,6 +76,8 @@ func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool) (string, err return "", err } req.ContentLength = size + req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("raw_upload_%d", time.Now().Unix())) + res, err := http.DefaultClient.Do(req) if err != nil { return "", err @@ -111,6 +114,7 @@ func (c *Client) DownloadRaw(hash string) (io.ReadCloser, bool, error) { type File struct { io.ReadCloser api.ManifestEntry + Tag string } // Open opens a local file which can then be passed to client.Upload to upload @@ -139,6 +143,7 @@ func Open(path string) (*File, error) { Size: stat.Size(), ModTime: stat.ModTime(), }, + Tag: filepath.Base(path), }, nil } @@ -422,6 +427,7 @@ func (c *Client) List(hash, prefix, credentials string) (*api.ManifestList, erro // Uploader uploads files to swarm using a provided UploadFn type Uploader interface { Upload(UploadFn) error + Tag() string } type UploaderFunc func(UploadFn) error @@ -430,12 +436,23 @@ func (u UploaderFunc) Upload(upload UploadFn) error { return u(upload) } +func (u UploaderFunc) Tag() string { + return fmt.Sprintf("multipart_upload_%d", time.Now().Unix()) +} + +// DirectoryUploader implements Uploader +var _ Uploader = &DirectoryUploader{} + // DirectoryUploader uploads all files in a directory, optionally uploading // a file to the default path type DirectoryUploader struct { Dir string } +func (d *DirectoryUploader) Tag() string { + return filepath.Base(d.Dir) +} + // Upload performs the upload of the directory and default path func (d *DirectoryUploader) Upload(upload UploadFn) error { return filepath.Walk(d.Dir, func(path string, f os.FileInfo, err error) error { @@ -458,11 +475,17 @@ func (d *DirectoryUploader) Upload(upload UploadFn) error { }) } +var _ Uploader = &FileUploader{} + // FileUploader uploads a single file type FileUploader struct { File *File } +func (f *FileUploader) Tag() string { + return f.File.Tag +} + // Upload performs the upload of the file func (f *FileUploader) Upload(upload UploadFn) error { return upload(f.File) @@ -509,6 +532,14 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t req.URL.RawQuery = q.Encode() } + tag := uploader.Tag() + if tag == "" { + tag = "unnamed_tag_" + fmt.Sprintf("%d", time.Now().Unix()) + } + log.Trace("setting upload tag", "tag", tag) + + req.Header.Set(swarmhttp.SwarmTagHeaderName, tag) + // use 'Expect: 100-continue' so we don't send the request body if // the server refuses the request req.Header.Set("Expect", "100-continue") @@ -574,6 +605,7 @@ func (c *Client) MultipartUpload(hash string, uploader Uploader) (string, error) mw := multipart.NewWriter(reqW) req.Header.Set("Content-Type", fmt.Sprintf("multipart/form-data; boundary=%q", mw.Boundary())) + req.Header.Set(swarmhttp.SwarmTagHeaderName, fmt.Sprintf("multipart_upload_%d", time.Now().Unix())) // define an UploadFn which adds files to the multipart form uploadFn := func(file *File) error { diff --git a/swarm/api/client/client_test.go b/swarm/api/client/client_test.go index 9c9bde5d6..92489849c 100644 --- a/swarm/api/client/client_test.go +++ b/swarm/api/client/client_test.go @@ -25,16 +25,14 @@ import ( "sort" "testing" - "github.com/ethereum/go-ethereum/swarm/testutil" - - "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/swarm/api" swarmhttp "github.com/ethereum/go-ethereum/swarm/api/http" + "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" + "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" + "github.com/ethereum/go-ethereum/swarm/testutil" ) func serverFunc(api *api.API) swarmhttp.TestServer { @@ -68,6 +66,10 @@ func testClientUploadDownloadRaw(toEncrypt bool, t *testing.T) { t.Fatal(err) } + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 1, 1, 0, 1) + // check we can download the same data res, isEncrypted, err := client.DownloadRaw(hash) if err != nil { @@ -209,6 +211,10 @@ func TestClientUploadDownloadDirectory(t *testing.T) { t.Fatalf("error uploading directory: %s", err) } + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 9, 9, 0, 9) + // check we can download the individual files checkDownloadFile := func(path string, expected []byte) { file, err := client.Download(hash, path) @@ -323,6 +329,7 @@ func TestClientMultipartUpload(t *testing.T) { defer srv.Close() // define an uploader which uploads testDirFiles with some data + // note: this test should result in SEEN chunks. assert accordingly data := []byte("some-data") uploader := UploaderFunc(func(upload UploadFn) error { for _, name := range testDirFiles { @@ -348,6 +355,10 @@ func TestClientMultipartUpload(t *testing.T) { t.Fatal(err) } + // check the tag was created successfully + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 9, 9, 7, 9) + // check we can download the individual files checkDownloadFile := func(path string) { file, err := client.Download(hash, path) diff --git a/swarm/api/filesystem_test.go b/swarm/api/filesystem_test.go index 02f5bff65..b8f37fdd5 100644 --- a/swarm/api/filesystem_test.go +++ b/swarm/api/filesystem_test.go @@ -25,13 +25,14 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" ) var testDownloadDir, _ = ioutil.TempDir(os.TempDir(), "bzz-test") func testFileSystem(t *testing.T, f func(*FileSystem, bool)) { - testAPI(t, func(api *API, toEncrypt bool) { + testAPI(t, func(api *API, _ *chunk.Tags, toEncrypt bool) { f(NewFileSystem(api), toEncrypt) }) } diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 320da3046..e6e263f4c 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -86,6 +87,54 @@ func InitLoggingResponseWriter(h http.Handler) http.Handler { }) } +// InitUploadTag creates a new tag for an upload to the local HTTP proxy +// if a tag is not named using the SwarmTagHeaderName, a fallback name will be used +// when the Content-Length header is set, an ETA on chunking will be available since the +// number of chunks to be split is known in advance (not including enclosing manifest chunks) +// the tag can later be accessed using the appropriate identifier in the request context +func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var ( + tagName string + err error + estimatedTotal int64 = 0 + contentType = r.Header.Get("Content-Type") + headerTag = r.Header.Get(SwarmTagHeaderName) + ) + if headerTag != "" { + tagName = headerTag + log.Trace("got tag name from http header", "tagName", tagName) + } else { + tagName = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix()) + } + + if !strings.Contains(contentType, "multipart") && r.ContentLength > 0 { + log.Trace("calculating tag size", "contentType", contentType, "contentLength", r.ContentLength) + uri := GetURI(r.Context()) + if uri != nil { + log.Debug("got uri from context") + if uri.Addr == "encrypt" { + estimatedTotal = calculateNumberOfChunks(r.ContentLength, true) + } else { + estimatedTotal = calculateNumberOfChunks(r.ContentLength, false) + } + } + } + + log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal) + + t, err := tags.New(tagName, estimatedTotal) + if err != nil { + log.Error("error creating tag", "err", err, "tagName", tagName) + } + + log.Trace("setting tag id to context", "uid", t.Uid) + ctx := sctx.SetTag(r.Context(), t.Uid) + + h.ServeHTTP(w, r.WithContext(ctx)) + }) +} + func InstrumentOpenTracing(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { uri := GetURI(r.Context()) diff --git a/swarm/api/http/response.go b/swarm/api/http/response.go index d4e81d7f6..c851a3992 100644 --- a/swarm/api/http/response.go +++ b/swarm/api/http/response.go @@ -79,7 +79,7 @@ func respondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg s } func respondError(w http.ResponseWriter, r *http.Request, msg string, code int) { - log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code) + log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code, "msg", msg) respondTemplate(w, r, "error", msg, code) } diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go index 3c6735a73..a336bd82f 100644 --- a/swarm/api/http/server.go +++ b/swarm/api/http/server.go @@ -26,6 +26,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "mime" "mime/multipart" "net/http" @@ -38,7 +39,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/rs/cors" @@ -60,6 +63,8 @@ var ( getListFail = metrics.NewRegisteredCounter("api.http.get.list.fail", nil) ) +const SwarmTagHeaderName = "x-swarm-tag" + type methodHandler map[string]http.Handler func (m methodHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { @@ -94,6 +99,12 @@ func NewServer(api *api.API, corsString string) *Server { InstrumentOpenTracing, } + tagAdapter := Adapter(func(h http.Handler) http.Handler { + return InitUploadTag(h, api.Tags) + }) + + defaultPostMiddlewares := append(defaultMiddlewares, tagAdapter) + mux := http.NewServeMux() mux.Handle("/bzz:/", methodHandler{ "GET": Adapt( @@ -102,7 +113,7 @@ func NewServer(api *api.API, corsString string) *Server { ), "POST": Adapt( http.HandlerFunc(server.HandlePostFiles), - defaultMiddlewares..., + defaultPostMiddlewares..., ), "DELETE": Adapt( http.HandlerFunc(server.HandleDelete), @@ -116,7 +127,7 @@ func NewServer(api *api.API, corsString string) *Server { ), "POST": Adapt( http.HandlerFunc(server.HandlePostRaw), - defaultMiddlewares..., + defaultPostMiddlewares..., ), }) mux.Handle("/bzz-immutable:/", methodHandler{ @@ -230,6 +241,12 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { ruid := GetRUID(r.Context()) log.Debug("handle.post.raw", "ruid", ruid) + tagUid := sctx.GetTag(r.Context()) + tag, err := s.api.Tags.Get(tagUid) + if err != nil { + log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err) + } + postRawCount.Inc(1) toEncrypt := false @@ -256,13 +273,16 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { return } - addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) + addr, wait, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) if err != nil { postRawFail.Inc(1) respondError(w, r, err.Error(), http.StatusInternalServerError) return } + wait(r.Context()) + tag.DoneSplit(addr) + log.Debug("stored content", "ruid", ruid, "key", addr) w.Header().Set("Content-Type", "text/plain") @@ -311,7 +331,6 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } log.Debug("new manifest", "ruid", ruid, "key", addr) } - newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error { switch contentType { case "application/x-tar": @@ -334,6 +353,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { return } + tagUid := sctx.GetTag(r.Context()) + tag, err := s.api.Tags.Get(tagUid) + if err != nil { + log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err) + } + + log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.Total()) + tag.DoneSplit(newAddr) + log.Debug("stored content", "ruid", ruid, "key", newAddr) w.Header().Set("Content-Type", "text/plain") @@ -342,7 +370,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (storage.Address, error) { - log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context())) + log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context())) defaultPath := r.URL.Query().Get("defaultpath") @@ -837,6 +865,28 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { http.ServeContent(w, r, fileName, time.Now(), newBufferedReadSeeker(reader, getFileBufferSize)) } +// calculateNumberOfChunks calculates the number of chunks in an arbitrary content length +func calculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 { + if contentLength < 4096 { + return 1 + } + branchingFactor := 128 + if isEncrypted { + branchingFactor = 64 + } + + dataChunks := math.Ceil(float64(contentLength) / float64(4096)) + totalChunks := dataChunks + intermediate := dataChunks / float64(branchingFactor) + + for intermediate > 1 { + totalChunks += math.Ceil(intermediate) + intermediate = intermediate / float64(branchingFactor) + } + + return int64(totalChunks) + 1 +} + // The size of buffer used for bufio.Reader on LazyChunkReader passed to // http.ServeContent in HandleGetFile. // Warning: This value influences the number of chunk requests and chunker join goroutines diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go index e82762ce0..1de41d18d 100644 --- a/swarm/api/http/server_test.go +++ b/swarm/api/http/server_test.go @@ -44,7 +44,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/api" - swarm "github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/testutil" @@ -755,6 +754,7 @@ func testBzzTar(encrypted bool, t *testing.T) { t.Fatal(err) } req.Header.Add("Content-Type", "application/x-tar") + req.Header.Add(SwarmTagHeaderName, "test-upload") client := &http.Client{} resp2, err := client.Do(req) if err != nil { @@ -763,6 +763,11 @@ func testBzzTar(encrypted bool, t *testing.T) { if resp2.StatusCode != http.StatusOK { t.Fatalf("err %s", resp2.Status) } + + // check that the tag was written correctly + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 4, 4, 0, 4) + swarmHash, err := ioutil.ReadAll(resp2.Body) resp2.Body.Close() if err != nil { @@ -834,6 +839,75 @@ func testBzzTar(encrypted bool, t *testing.T) { t.Fatalf("file %s did not pass content assertion", hdr.Name) } } + + // now check the tags endpoint +} + +// TestBzzCorrectTagEstimate checks that the HTTP middleware sets the total number of chunks +// in the tag according to an estimate from the HTTP request Content-Length header divided +// by chunk size (4096). It is needed to be checked BEFORE chunking is done, therefore +// concurrency was introduced to slow down the HTTP request +func TestBzzCorrectTagEstimate(t *testing.T) { + srv := NewTestSwarmServer(t, serverFunc, nil) + defer srv.Close() + + for _, v := range []struct { + toEncrypt bool + expChunks int64 + }{ + {toEncrypt: false, expChunks: 248}, + {toEncrypt: true, expChunks: 250}, + } { + pr, pw := io.Pipe() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + addr := "" + if v.toEncrypt { + addr = "encrypt" + } + req, err := http.NewRequest("POST", srv.URL+"/bzz:/"+addr, pr) + if err != nil { + t.Fatal(err) + } + + req = req.WithContext(ctx) + req.ContentLength = 1000000 + req.Header.Add(SwarmTagHeaderName, "1000000") + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Millisecond): + _, err := pw.Write([]byte{0}) + if err != nil { + t.Error(err) + } + } + } + }() + go func() { + transport := http.DefaultTransport + _, err := transport.RoundTrip(req) + if err != nil { + t.Error(err) + } + }() + done := false + for !done { + switch len(srv.Tags.All()) { + case 0: + <-time.After(10 * time.Millisecond) + case 1: + tag := srv.Tags.All()[0] + testutil.CheckTag(t, tag, 0, 0, 0, v.expChunks) + srv.Tags.Delete(tag.Uid) + done = true + } + } + } } // TestBzzRootRedirect tests that getting the root path of a manifest without @@ -851,19 +925,11 @@ func testBzzRootRedirect(toEncrypt bool, t *testing.T) { defer srv.Close() // create a manifest with some data at the root path - client := swarm.NewClient(srv.URL) data := []byte("data") - file := &swarm.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), - ManifestEntry: api.ManifestEntry{ - Path: "", - ContentType: "text/plain", - Size: int64(len(data)), - }, - } - hash, err := client.Upload(file, "", toEncrypt) - if err != nil { - t.Fatal(err) + headers := map[string]string{"Content-Type": "text/plain"} + res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader(data), headers, false, t) + if res.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK) } // define a CheckRedirect hook which ensures there is only a single @@ -1046,21 +1112,10 @@ func TestGet(t *testing.T) { func TestModify(t *testing.T) { srv := NewTestSwarmServer(t, serverFunc, nil) defer srv.Close() - - swarmClient := swarm.NewClient(srv.URL) - data := []byte("data") - file := &swarm.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), - ManifestEntry: api.ManifestEntry{ - Path: "", - ContentType: "text/plain", - Size: int64(len(data)), - }, - } - - hash, err := swarmClient.Upload(file, "", false) - if err != nil { - t.Fatal(err) + headers := map[string]string{"Content-Type": "text/plain"} + res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader([]byte("data")), headers, false, t) + if res.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK) } for _, testCase := range []struct { @@ -1283,6 +1338,46 @@ func TestBzzGetFileWithResolver(t *testing.T) { } } +// TestCalculateNumberOfChunks is a unit test for the chunk-number-according-to-content-length +// calculation +func TestCalculateNumberOfChunks(t *testing.T) { + + //test cases: + for _, tc := range []struct{ len, chunks int64 }{ + {len: 1000, chunks: 1}, + {len: 5000, chunks: 3}, + {len: 10000, chunks: 4}, + {len: 100000, chunks: 26}, + {len: 1000000, chunks: 248}, + {len: 325839339210, chunks: 79550620 + 621490 + 4856 + 38 + 1}, + } { + res := calculateNumberOfChunks(tc.len, false) + if res != tc.chunks { + t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res) + } + } +} + +// TestCalculateNumberOfChunksEncrypted is a unit test for the chunk-number-according-to-content-length +// calculation with encryption (branching factor=64) +func TestCalculateNumberOfChunksEncrypted(t *testing.T) { + + //test cases: + for _, tc := range []struct{ len, chunks int64 }{ + {len: 1000, chunks: 1}, + {len: 5000, chunks: 3}, + {len: 10000, chunks: 4}, + {len: 100000, chunks: 26}, + {len: 1000000, chunks: 245 + 4 + 1}, + {len: 325839339210, chunks: 79550620 + 1242979 + 19422 + 304 + 5 + 1}, + } { + res := calculateNumberOfChunks(tc.len, true) + if res != tc.chunks { + t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res) + } + } +} + // testResolver implements the Resolver interface and either returns the given // hash if it is set, or returns a "name not found" error type testResolveValidator struct { @@ -1308,6 +1403,7 @@ func (t *testResolveValidator) Resolve(addr string) (common.Hash, error) { func (t *testResolveValidator) Owner(node [32]byte) (addr common.Address, err error) { return } + func (t *testResolveValidator) HeaderByNumber(context.Context, *big.Int) (header *types.Header, err error) { return } diff --git a/swarm/api/http/test_server.go b/swarm/api/http/test_server.go index 928a6e972..fbb3366e2 100644 --- a/swarm/api/http/test_server.go +++ b/swarm/api/http/test_server.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/storage/localstore" @@ -44,7 +45,9 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso t.Fatal(err) } - fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams()) + tags := chunk.NewTags() + fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams(), tags) + // Swarm feeds test setup feedsDir, err := ioutil.TempDir("", "swarm-feeds-test") if err != nil { @@ -56,12 +59,13 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso t.Fatal(err) } - swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil) + swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil, tags) apiServer := httptest.NewServer(serverFunc(swarmApi)) tss := &TestSwarmServer{ Server: apiServer, FileStore: fileStore, + Tags: tags, dir: swarmDir, Hasher: storage.MakeHashFunc(storage.DefaultHash)(), cleanup: func() { @@ -81,6 +85,7 @@ type TestSwarmServer struct { *httptest.Server Hasher storage.SwarmHash FileStore *storage.FileStore + Tags *chunk.Tags dir string cleanup func() CurrentTime uint64 diff --git a/swarm/api/manifest_test.go b/swarm/api/manifest_test.go index 1c8e53c43..c193ebcb4 100644 --- a/swarm/api/manifest_test.go +++ b/swarm/api/manifest_test.go @@ -25,6 +25,7 @@ import ( "strings" "testing" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -42,7 +43,7 @@ func manifest(paths ...string) (manifestReader storage.LazySectionReader) { func testGetEntry(t *testing.T, path, match string, multiple bool, paths ...string) *manifestTrie { quitC := make(chan bool) - fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags()) ref := make([]byte, fileStore.HashSize()) trie, err := readManifest(manifest(paths...), ref, fileStore, false, quitC, NOOPDecrypt) if err != nil { @@ -99,7 +100,7 @@ func TestGetEntry(t *testing.T) { func TestExactMatch(t *testing.T) { quitC := make(chan bool) mf := manifest("shouldBeExactMatch.css", "shouldBeExactMatch.css.map") - fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags()) ref := make([]byte, fileStore.HashSize()) trie, err := readManifest(mf, ref, fileStore, false, quitC, nil) if err != nil { @@ -132,7 +133,7 @@ func TestAddFileWithManifestPath(t *testing.T) { reader := &storage.LazyTestSectionReader{ SectionReader: io.NewSectionReader(bytes.NewReader(manifest), 0, int64(len(manifest))), } - fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags()) ref := make([]byte, fileStore.HashSize()) trie, err := readManifest(reader, ref, fileStore, false, nil, NOOPDecrypt) if err != nil { diff --git a/swarm/api/storage.go b/swarm/api/storage.go deleted file mode 100644 index 254375b77..000000000 --- a/swarm/api/storage.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package api - -import ( - "context" - "path" - - "github.com/ethereum/go-ethereum/swarm/storage" -) - -type Response struct { - MimeType string - Status int - Size int64 - // Content []byte - Content string -} - -// implements a service -// -// DEPRECATED: Use the HTTP API instead -type Storage struct { - api *API -} - -func NewStorage(api *API) *Storage { - return &Storage{api} -} - -// Put uploads the content to the swarm with a simple manifest speficying -// its content type -// -// DEPRECATED: Use the HTTP API instead -func (s *Storage) Put(ctx context.Context, content string, contentType string, toEncrypt bool) (storage.Address, func(context.Context) error, error) { - return s.api.Put(ctx, content, contentType, toEncrypt) -} - -// Get retrieves the content from bzzpath and reads the response in full -// It returns the Response object, which serialises containing the -// response body as the value of the Content field -// NOTE: if error is non-nil, sResponse may still have partial content -// the actual size of which is given in len(resp.Content), while the expected -// size is resp.Size -// -// DEPRECATED: Use the HTTP API instead -func (s *Storage) Get(ctx context.Context, bzzpath string) (*Response, error) { - uri, err := Parse(path.Join("bzz:/", bzzpath)) - if err != nil { - return nil, err - } - addr, err := s.api.Resolve(ctx, uri.Addr) - if err != nil { - return nil, err - } - reader, mimeType, status, _, err := s.api.Get(ctx, nil, addr, uri.Path) - if err != nil { - return nil, err - } - quitC := make(chan bool) - expsize, err := reader.Size(ctx, quitC) - if err != nil { - return nil, err - } - body := make([]byte, expsize) - size, err := reader.Read(body) - if int64(size) == expsize { - err = nil - } - return &Response{mimeType, status, expsize, string(body[:size])}, err -} diff --git a/swarm/api/storage_test.go b/swarm/api/storage_test.go deleted file mode 100644 index ef96972b6..000000000 --- a/swarm/api/storage_test.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package api - -import ( - "context" - "testing" -) - -func testStorage(t *testing.T, f func(*Storage, bool)) { - testAPI(t, func(api *API, toEncrypt bool) { - f(NewStorage(api), toEncrypt) - }) -} - -func TestStoragePutGet(t *testing.T) { - testStorage(t, func(api *Storage, toEncrypt bool) { - content := "hello" - exp := expResponse(content, "text/plain", 0) - // exp := expResponse([]byte(content), "text/plain", 0) - ctx := context.TODO() - bzzkey, wait, err := api.Put(ctx, content, exp.MimeType, toEncrypt) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - err = wait(ctx) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - bzzhash := bzzkey.Hex() - // to check put against the API#Get - resp0 := testGet(t, api.api, bzzhash, "") - checkResponse(t, resp0, exp) - - // check storage#Get - resp, err := api.Get(context.TODO(), bzzhash) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - checkResponse(t, &testResponse{nil, resp}, exp) - }) -} diff --git a/swarm/chunk/tag.go b/swarm/chunk/tag.go index 359ac11ac..ee700d22b 100644 --- a/swarm/chunk/tag.go +++ b/swarm/chunk/tag.go @@ -34,11 +34,11 @@ var ( type State = uint32 const ( - SPLIT State = iota // chunk has been processed by filehasher/swarm safe call - STORED // chunk stored locally - SEEN // chunk previously seen - SENT // chunk sent to neighbourhood - SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere + StateSplit State = iota // chunk has been processed by filehasher/swarm safe call + StateStored // chunk stored locally + StateSeen // chunk previously seen + StateSent // chunk sent to neighbourhood + StateSynced // proof is received; chunk removed from sync db; chunk is available everywhere ) // Tag represents info on the status of new chunks @@ -46,18 +46,18 @@ type Tag struct { Uid uint32 // a unique identifier for this tag Name string // a name tag for this tag Address Address // the associated swarm hash for this tag - total uint32 // total chunks belonging to a tag - split uint32 // number of chunks already processed by splitter for hashing - seen uint32 // number of chunks already seen - stored uint32 // number of chunks already stored locally - sent uint32 // number of chunks sent for push syncing - synced uint32 // number of chunks synced with proof + total int64 // total chunks belonging to a tag + split int64 // number of chunks already processed by splitter for hashing + seen int64 // number of chunks already seen + stored int64 // number of chunks already stored locally + sent int64 // number of chunks sent for push syncing + synced int64 // number of chunks synced with proof startedAt time.Time // tag started to calculate ETA } // New creates a new tag, stores it by the name and returns it // it returns an error if the tag with this name already exists -func NewTag(uid uint32, s string, total uint32) *Tag { +func NewTag(uid uint32, s string, total int64) *Tag { t := &Tag{ Uid: uid, Name: s, @@ -69,65 +69,65 @@ func NewTag(uid uint32, s string, total uint32) *Tag { // Inc increments the count for a state func (t *Tag) Inc(state State) { - var v *uint32 + var v *int64 switch state { - case SPLIT: + case StateSplit: v = &t.split - case STORED: + case StateStored: v = &t.stored - case SEEN: + case StateSeen: v = &t.seen - case SENT: + case StateSent: v = &t.sent - case SYNCED: + case StateSynced: v = &t.synced } - atomic.AddUint32(v, 1) + atomic.AddInt64(v, 1) } // Get returns the count for a state on a tag -func (t *Tag) Get(state State) int { - var v *uint32 +func (t *Tag) Get(state State) int64 { + var v *int64 switch state { - case SPLIT: + case StateSplit: v = &t.split - case STORED: + case StateStored: v = &t.stored - case SEEN: + case StateSeen: v = &t.seen - case SENT: + case StateSent: v = &t.sent - case SYNCED: + case StateSynced: v = &t.synced } - return int(atomic.LoadUint32(v)) + return atomic.LoadInt64(v) } // GetTotal returns the total count -func (t *Tag) Total() int { - return int(atomic.LoadUint32(&t.total)) +func (t *Tag) Total() int64 { + return atomic.LoadInt64(&t.total) } // DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag // is meant to be called when splitter finishes for input streams of unknown size -func (t *Tag) DoneSplit(address Address) int { - total := atomic.LoadUint32(&t.split) - atomic.StoreUint32(&t.total, total) +func (t *Tag) DoneSplit(address Address) int64 { + total := atomic.LoadInt64(&t.split) + atomic.StoreInt64(&t.total, total) t.Address = address - return int(total) + return total } // Status returns the value of state and the total count -func (t *Tag) Status(state State) (int, int, error) { - count, seen, total := t.Get(state), int(atomic.LoadUint32(&t.seen)), int(atomic.LoadUint32(&t.total)) +func (t *Tag) Status(state State) (int64, int64, error) { + count, seen, total := t.Get(state), atomic.LoadInt64(&t.seen), atomic.LoadInt64(&t.total) if total == 0 { return count, total, errNA } switch state { - case SPLIT, STORED, SEEN: + case StateSplit, StateStored, StateSeen: return count, total, nil - case SENT, SYNCED: - stored := int(atomic.LoadUint32(&t.stored)) + case StateSent, StateSynced: + stored := atomic.LoadInt64(&t.stored) if stored < total { return count, total - seen, errNA } @@ -152,14 +152,14 @@ func (t *Tag) ETA(state State) (time.Time, error) { // MarshalBinary marshals the tag into a byte slice func (tag *Tag) MarshalBinary() (data []byte, err error) { - buffer := make([]byte, 0) - encodeUint32Append(&buffer, tag.Uid) - encodeUint32Append(&buffer, tag.total) - encodeUint32Append(&buffer, tag.split) - encodeUint32Append(&buffer, tag.seen) - encodeUint32Append(&buffer, tag.stored) - encodeUint32Append(&buffer, tag.sent) - encodeUint32Append(&buffer, tag.synced) + buffer := make([]byte, 4) + binary.BigEndian.PutUint32(buffer, tag.Uid) + encodeInt64Append(&buffer, tag.total) + encodeInt64Append(&buffer, tag.split) + encodeInt64Append(&buffer, tag.seen) + encodeInt64Append(&buffer, tag.stored) + encodeInt64Append(&buffer, tag.sent) + encodeInt64Append(&buffer, tag.synced) intBuffer := make([]byte, 8) @@ -181,14 +181,15 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error { if len(buffer) < 13 { return errors.New("buffer too short") } + tag.Uid = binary.BigEndian.Uint32(buffer) + buffer = buffer[4:] - tag.Uid = decodeUint32Splice(&buffer) - tag.total = decodeUint32Splice(&buffer) - tag.split = decodeUint32Splice(&buffer) - tag.seen = decodeUint32Splice(&buffer) - tag.stored = decodeUint32Splice(&buffer) - tag.sent = decodeUint32Splice(&buffer) - tag.synced = decodeUint32Splice(&buffer) + tag.total = decodeInt64Splice(&buffer) + tag.split = decodeInt64Splice(&buffer) + tag.seen = decodeInt64Splice(&buffer) + tag.stored = decodeInt64Splice(&buffer) + tag.sent = decodeInt64Splice(&buffer) + tag.synced = decodeInt64Splice(&buffer) t, n := binary.Varint(buffer) tag.startedAt = time.Unix(t, 0) @@ -202,17 +203,16 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error { tag.Name = string(buffer[t:]) return nil - } -func encodeUint32Append(buffer *[]byte, val uint32) { - intBuffer := make([]byte, 4) - binary.BigEndian.PutUint32(intBuffer, val) - *buffer = append(*buffer, intBuffer...) +func encodeInt64Append(buffer *[]byte, val int64) { + intBuffer := make([]byte, 8) + n := binary.PutVarint(intBuffer, val) + *buffer = append(*buffer, intBuffer[:n]...) } -func decodeUint32Splice(buffer *[]byte) uint32 { - val := binary.BigEndian.Uint32((*buffer)[:4]) - *buffer = (*buffer)[4:] +func decodeInt64Splice(buffer *[]byte) int64 { + val, n := binary.Varint((*buffer)) + *buffer = (*buffer)[n:] return val } diff --git a/swarm/chunk/tag_test.go b/swarm/chunk/tag_test.go index b3f3be2ca..e6acfb185 100644 --- a/swarm/chunk/tag_test.go +++ b/swarm/chunk/tag_test.go @@ -24,7 +24,7 @@ import ( ) var ( - allStates = []State{SPLIT, STORED, SEEN, SENT, SYNCED} + allStates = []State{StateSplit, StateStored, StateSeen, StateSent, StateSynced} ) // TestTagSingleIncrements tests if Inc increments the tag state value @@ -34,14 +34,14 @@ func TestTagSingleIncrements(t *testing.T) { tc := []struct { state uint32 inc int - expcount int - exptotal int + expcount int64 + exptotal int64 }{ - {state: SPLIT, inc: 10, expcount: 10, exptotal: 10}, - {state: STORED, inc: 9, expcount: 9, exptotal: 9}, - {state: SEEN, inc: 1, expcount: 1, exptotal: 10}, - {state: SENT, inc: 9, expcount: 9, exptotal: 9}, - {state: SYNCED, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSplit, inc: 10, expcount: 10, exptotal: 10}, + {state: StateStored, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSeen, inc: 1, expcount: 1, exptotal: 10}, + {state: StateSent, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSynced, inc: 9, expcount: 9, exptotal: 9}, } for _, tc := range tc { @@ -60,24 +60,24 @@ func TestTagSingleIncrements(t *testing.T) { // TestTagStatus is a unit test to cover Tag.Status method functionality func TestTagStatus(t *testing.T) { tg := &Tag{total: 10} - tg.Inc(SEEN) - tg.Inc(SENT) - tg.Inc(SYNCED) + tg.Inc(StateSeen) + tg.Inc(StateSent) + tg.Inc(StateSynced) for i := 0; i < 10; i++ { - tg.Inc(SPLIT) - tg.Inc(STORED) + tg.Inc(StateSplit) + tg.Inc(StateStored) } for _, v := range []struct { state State - expVal int - expTotal int + expVal int64 + expTotal int64 }{ - {state: STORED, expVal: 10, expTotal: 10}, - {state: SPLIT, expVal: 10, expTotal: 10}, - {state: SEEN, expVal: 1, expTotal: 10}, - {state: SENT, expVal: 1, expTotal: 9}, - {state: SYNCED, expVal: 1, expTotal: 9}, + {state: StateStored, expVal: 10, expTotal: 10}, + {state: StateSplit, expVal: 10, expTotal: 10}, + {state: StateSeen, expVal: 1, expTotal: 10}, + {state: StateSent, expVal: 1, expTotal: 9}, + {state: StateSynced, expVal: 1, expTotal: 9}, } { val, total, err := tg.Status(v.state) if err != nil { @@ -98,8 +98,8 @@ func TestTagETA(t *testing.T) { maxDiff := 100000 // 100 microsecond tg := &Tag{total: 10, startedAt: now} time.Sleep(100 * time.Millisecond) - tg.Inc(SPLIT) - eta, err := tg.ETA(SPLIT) + tg.Inc(StateSplit) + eta, err := tg.ETA(StateSplit) if err != nil { t.Fatal(err) } @@ -128,7 +128,7 @@ func TestTagConcurrentIncrements(t *testing.T) { wg.Wait() for _, f := range allStates { v := tg.Get(f) - if v != n { + if v != int64(n) { t.Fatalf("expected state %v to be %v, got %v", f, n, v) } } @@ -142,7 +142,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { wg.Add(10 * 5 * n) for i := 0; i < 10; i++ { s := string([]byte{uint8(i)}) - tag, err := ts.New(s, n) + tag, err := ts.New(s, int64(n)) if err != nil { t.Fatal(err) } @@ -168,7 +168,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { t.Fatal(err) } stateVal := tag.Get(f) - if stateVal != n { + if stateVal != int64(n) { t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v) } } diff --git a/swarm/chunk/tags.go b/swarm/chunk/tags.go index 07f9b8cd7..435f5d706 100644 --- a/swarm/chunk/tags.go +++ b/swarm/chunk/tags.go @@ -42,12 +42,12 @@ func NewTags() *Tags { // New creates a new tag, stores it by the name and returns it // it returns an error if the tag with this name already exists -func (ts *Tags) New(s string, total int) (*Tag, error) { +func (ts *Tags) New(s string, total int64) (*Tag, error) { t := &Tag{ Uid: ts.rng.Uint32(), Name: s, startedAt: time.Now(), - total: uint32(total), + total: total, } if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded { return nil, errExists @@ -55,6 +55,18 @@ func (ts *Tags) New(s string, total int) (*Tag, error) { return t, nil } +// All returns all existing tags in Tags' sync.Map +// Note that tags are returned in no particular order +func (ts *Tags) All() (t []*Tag) { + ts.tags.Range(func(k, v interface{}) bool { + t = append(t, v.(*Tag)) + + return true + }) + + return t +} + // Get returns the undelying tag for the uid or an error if not found func (ts *Tags) Get(uid uint32) (*Tag, error) { t, ok := ts.tags.Load(uid) @@ -64,8 +76,8 @@ func (ts *Tags) Get(uid uint32) (*Tag, error) { return t.(*Tag), nil } -// GetContext gets a tag from the tag uid stored in the context -func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) { +// GetFromContext gets a tag from the tag uid stored in the context +func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) { uid := sctx.GetTag(ctx) t, ok := ts.tags.Load(uid) if !ok { @@ -78,3 +90,7 @@ func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) { func (ts *Tags) Range(fn func(k, v interface{}) bool) { ts.tags.Range(fn) } + +func (ts *Tags) Delete(k interface{}) { + ts.tags.Delete(k) +} diff --git a/swarm/chunk/tags_test.go b/swarm/chunk/tags_test.go new file mode 100644 index 000000000..f818c4c5c --- /dev/null +++ b/swarm/chunk/tags_test.go @@ -0,0 +1,48 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package chunk + +import "testing" + +func TestAll(t *testing.T) { + ts := NewTags() + + ts.New("1", 1) + ts.New("2", 1) + + all := ts.All() + + if len(all) != 2 { + t.Fatalf("expected length to be 2 got %d", len(all)) + } + + if n := all[0].Total(); n != 1 { + t.Fatalf("expected tag 0 total to be 1 got %d", n) + } + + if n := all[1].Total(); n != 1 { + t.Fatalf("expected tag 1 total to be 1 got %d", n) + } + + ts.New("3", 1) + all = ts.All() + + if len(all) != 3 { + t.Fatalf("expected length to be 3 got %d", len(all)) + } + +} diff --git a/swarm/fuse/swarmfs_test.go b/swarm/fuse/swarmfs_test.go index 460e31c4e..77573f0fc 100644 --- a/swarm/fuse/swarmfs_test.go +++ b/swarm/fuse/swarmfs_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/testutil" colorable "github.com/mattn/go-colorable" @@ -1614,11 +1615,11 @@ func TestFUSE(t *testing.T) { } defer os.RemoveAll(datadir) - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags()) if err != nil { t.Fatal(err) } - ta := &testAPI{api: api.NewAPI(fileStore, nil, nil, nil)} + ta := &testAPI{api: api.NewAPI(fileStore, nil, nil, nil, chunk.NewTags())} //run a short suite of tests //approx time: 28s diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 8e6be72b6..615b3b68f 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -127,7 +127,7 @@ func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, return nil, nil, nil, err } - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams(), chunk.NewTags()) kad := network.NewKademlia(addr.Over(), network.NewKadParams()) delivery := NewDelivery(kad, netStore) diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 5f73f7cc8..fc0f9d5df 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -380,7 +380,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) i++ } //...which then gets passed to the round-robin file store - roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) + roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams(), chunk.NewTags()) //now we can actually upload a (random) file to the round-robin store size := chunkCount * chunkSize log.Debug("Storing data to file store") diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index fefdb7c9f..da4ff673b 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -298,7 +298,7 @@ func mapKeysToNodes(conf *synctestConfig) { //upload a file(chunks) to a single local node store func uploadFileToSingleNodeStore(id enode.ID, chunkCount int, store chunk.Store) ([]storage.Address, error) { log.Debug(fmt.Sprintf("Uploading to node id: %s", id)) - fileStore := storage.NewFileStore(store, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(store, storage.NewFileStoreParams(), chunk.NewTags()) size := chunkSize var rootAddrs []storage.Address for i := 0; i < chunkCount; i++ { diff --git a/swarm/network_test.go b/swarm/network_test.go index 97bdd07b1..1a8c992a3 100644 --- a/swarm/network_test.go +++ b/swarm/network_test.go @@ -23,11 +23,13 @@ import ( "io/ioutil" "math/rand" "os" + "strings" "sync" "sync/atomic" "testing" "time" + "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/testutil" "github.com/ethereum/go-ethereum/crypto" @@ -416,7 +418,7 @@ func uploadFile(swarm *Swarm) (storage.Address, string, error) { // uniqueness is very certain. data := fmt.Sprintf("test content %s %x", time.Now().Round(0), b) ctx := context.TODO() - k, wait, err := swarm.api.Put(ctx, data, "text/plain", false) + k, wait, err := putString(ctx, swarm.api, data, "text/plain", false) if err != nil { return nil, "", err } @@ -530,3 +532,31 @@ func retrieve( return uint64(totalCheckCount) - atomic.LoadUint64(totalFoundCount) } + +// putString provides singleton manifest creation on top of api.API +func putString(ctx context.Context, a *api.API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { + r := strings.NewReader(content) + tag, err := a.Tags.New("unnamed-tag", 0) + + log.Trace("created new tag", "uid", tag.Uid) + + cCtx := sctx.SetTag(ctx, tag.Uid) + key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt) + if err != nil { + return nil, nil, err + } + manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) + r = strings.NewReader(manifest) + key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt) + if err != nil { + return nil, nil, err + } + tag.DoneSplit(key) + return key, func(ctx context.Context) error { + err := waitContent(ctx) + if err != nil { + return err + } + return waitManifest(ctx) + }, nil +} diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index 9a1259444..a0fe2e769 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -24,6 +24,7 @@ import ( "io" "testing" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/testutil" "golang.org/x/crypto/sha3" ) @@ -42,8 +43,10 @@ type chunkerTester struct { t test } +var mockTag = chunk.NewTag(0, "mock-tag", 0) + func newTestHasherStore(store ChunkStore, hash string) *hasherStore { - return NewHasherStore(store, MakeHashFunc(hash), false) + return NewHasherStore(store, MakeHashFunc(hash), false, chunk.NewTag(0, "test-tag", 0)) } func testRandomBrokenData(n int, tester *chunkerTester) { @@ -91,7 +94,7 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) var err error ctx := context.TODO() if usePyramid { - addr, wait, err = PyramidSplit(ctx, data, putGetter, putGetter) + addr, wait, err = PyramidSplit(ctx, data, putGetter, putGetter, mockTag) } else { addr, wait, err = TreeSplit(ctx, data, int64(n), putGetter) } @@ -188,7 +191,7 @@ func TestDataAppend(t *testing.T) { putGetter := newTestHasherStore(store, SHA3Hash) ctx := context.TODO() - addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { tester.t.Fatalf(err.Error()) } @@ -208,7 +211,7 @@ func TestDataAppend(t *testing.T) { } putGetter = newTestHasherStore(store, SHA3Hash) - newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter) + newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter, mockTag) if err != nil { tester.t.Fatalf(err.Error()) } @@ -278,7 +281,7 @@ func benchmarkSplitJoin(n int, t *testing.B) { putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash) ctx := context.TODO() - key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } @@ -335,7 +338,7 @@ func benchmarkSplitPyramidBMT(n int, t *testing.B) { putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash) ctx := context.Background() - _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } @@ -353,7 +356,7 @@ func benchmarkSplitPyramidSHA3(n int, t *testing.B) { putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash) ctx := context.Background() - _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + _, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } @@ -374,7 +377,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) { putGetter := newTestHasherStore(store, SHA3Hash) ctx := context.Background() - key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter) + key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } @@ -384,7 +387,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) { } putGetter = newTestHasherStore(store, SHA3Hash) - _, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter) + _, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter, mockTag) if err != nil { t.Fatalf(err.Error()) } diff --git a/swarm/storage/filestore.go b/swarm/storage/filestore.go index 2b15f7da6..dc096e56c 100644 --- a/swarm/storage/filestore.go +++ b/swarm/storage/filestore.go @@ -47,6 +47,7 @@ const ( type FileStore struct { ChunkStore hashFunc SwarmHasher + tags *chunk.Tags } type FileStoreParams struct { @@ -60,19 +61,20 @@ func NewFileStoreParams() *FileStoreParams { } // for testing locally -func NewLocalFileStore(datadir string, basekey []byte) (*FileStore, error) { +func NewLocalFileStore(datadir string, basekey []byte, tags *chunk.Tags) (*FileStore, error) { localStore, err := localstore.New(datadir, basekey, nil) if err != nil { return nil, err } - return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams()), nil + return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams(), tags), nil } -func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore { +func NewFileStore(store ChunkStore, params *FileStoreParams, tags *chunk.Tags) *FileStore { hashFunc := MakeHashFunc(params.Hash) return &FileStore{ ChunkStore: store, hashFunc: hashFunc, + tags: tags, } } @@ -83,7 +85,11 @@ func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore { // It returns a reader with the chunk data and whether the content was encrypted func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChunkReader, isEncrypted bool) { isEncrypted = len(addr) > f.hashFunc().Size() - getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted) + tag, err := f.tags.GetFromContext(ctx) + if err != nil { + tag = chunk.NewTag(0, "ephemeral-retrieval-tag", 0) + } + getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted, tag) reader = TreeJoin(ctx, addr, getter, 0) return } @@ -91,8 +97,17 @@ func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChu // Store is a public API. Main entry point for document storage directly. Used by the // FS-aware API and httpaccess func (f *FileStore) Store(ctx context.Context, data io.Reader, size int64, toEncrypt bool) (addr Address, wait func(context.Context) error, err error) { - putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt) - return PyramidSplit(ctx, data, putter, putter) + tag, err := f.tags.GetFromContext(ctx) + if err != nil { + // some of the parts of the codebase, namely the manifest trie, do not store the context + // of the original request nor the tag with the trie, recalculating the trie hence + // loses the tag uid. thus we create an ephemeral tag here for that purpose + + tag = chunk.NewTag(0, "", 0) + //return nil, nil, err + } + putter := NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt, tag) + return PyramidSplit(ctx, data, putter, putter, tag) } func (f *FileStore) HashSize() int { @@ -101,12 +116,14 @@ func (f *FileStore) HashSize() int { // GetAllReferences is a public API. This endpoint returns all chunk hashes (only) for a given file func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader, toEncrypt bool) (addrs AddressCollection, err error) { + tag := chunk.NewTag(0, "ephemeral-tag", 0) //this tag is just a mock ephemeral tag since we don't want to save these results + // create a special kind of putter, which only will store the references putter := &hashExplorer{ - hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt), + hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt, tag), } // do the actual splitting anyway, no way around it - _, wait, err := PyramidSplit(ctx, data, putter, putter) + _, wait, err := PyramidSplit(ctx, data, putter, putter, tag) if err != nil { return nil, err } diff --git a/swarm/storage/filestore_test.go b/swarm/storage/filestore_test.go index fe01eed9a..d0a167a24 100644 --- a/swarm/storage/filestore_test.go +++ b/swarm/storage/filestore_test.go @@ -25,6 +25,7 @@ import ( "path/filepath" "testing" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage/localstore" "github.com/ethereum/go-ethereum/swarm/testutil" ) @@ -48,7 +49,7 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) { } defer localStore.Close() - fileStore := NewFileStore(localStore, NewFileStoreParams()) + fileStore := NewFileStore(localStore, NewFileStoreParams(), chunk.NewTags()) slice := testutil.RandomBytes(1, testDataSize) ctx := context.TODO() @@ -113,7 +114,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) { } defer localStore.Close() - fileStore := NewFileStore(localStore, NewFileStoreParams()) + fileStore := NewFileStore(localStore, NewFileStoreParams(), chunk.NewTags()) slice := testutil.RandomBytes(1, testDataSize) ctx := context.TODO() key, wait, err := fileStore.Store(ctx, bytes.NewReader(slice), testDataSize, toEncrypt) @@ -182,7 +183,7 @@ func TestGetAllReferences(t *testing.T) { } defer localStore.Close() - fileStore := NewFileStore(localStore, NewFileStoreParams()) + fileStore := NewFileStore(localStore, NewFileStoreParams(), chunk.NewTags()) // testRuns[i] and expectedLen[i] are dataSize and expected length respectively testRuns := []int{1024, 8192, 16000, 30000, 1000000} diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go index 2e4a1c11b..1e702f11a 100644 --- a/swarm/storage/hasherstore.go +++ b/swarm/storage/hasherstore.go @@ -28,6 +28,7 @@ import ( type hasherStore struct { store ChunkStore + tag *chunk.Tag toEncrypt bool hashFunc SwarmHasher hashSize int // content hash size @@ -44,7 +45,7 @@ type hasherStore struct { // NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces. // With the HasherStore you can put and get chunk data (which is just []byte) into a ChunkStore // and the hasherStore will take core of encryption/decryption of data if necessary -func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore { +func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool, tag *chunk.Tag) *hasherStore { hashSize := hashFunc().Size() refSize := int64(hashSize) if toEncrypt { @@ -53,6 +54,7 @@ func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *has h := &hasherStore{ store: store, + tag: tag, toEncrypt: toEncrypt, hashFunc: hashFunc, hashSize: hashSize, @@ -242,7 +244,11 @@ func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryptio func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) { atomic.AddUint64(&h.nrChunks, 1) go func() { - _, err := h.store.Put(ctx, chunk.ModePutUpload, ch) + seen, err := h.store.Put(ctx, chunk.ModePutUpload, ch) + h.tag.Inc(chunk.StateStored) + if seen { + h.tag.Inc(chunk.StateSeen) + } select { case h.errC <- err: case <-h.quitC: diff --git a/swarm/storage/hasherstore_test.go b/swarm/storage/hasherstore_test.go index c95537db7..9dfd7ab1d 100644 --- a/swarm/storage/hasherstore_test.go +++ b/swarm/storage/hasherstore_test.go @@ -43,7 +43,7 @@ func TestHasherStore(t *testing.T) { for _, tt := range tests { chunkStore := NewMapChunkStore() - hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt) + hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt, chunk.NewTag(0, "test-tag", 0)) // Put two random chunks into the hasherStore chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data() diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 281bbe9fe..9b0d5397b 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -96,12 +96,12 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Address), the root hash of the entire content will fill this once processing finishes. New chunks to store are store using the putter which the caller provides. */ -func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize)).Split(ctx) +func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter, tag *chunk.Tag) (Address, func(context.Context) error, error) { + return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize), tag).Split(ctx) } -func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize)).Append(ctx) +func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter, tag *chunk.Tag) (Address, func(context.Context) error, error) { + return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize), tag).Append(ctx) } // Entry to create a tree node @@ -142,6 +142,7 @@ type PyramidChunker struct { putter Putter getter Getter key Address + tag *chunk.Tag workerCount int64 workerLock sync.RWMutex jobC chan *chunkJob @@ -152,7 +153,7 @@ type PyramidChunker struct { chunkLevel [][]*TreeEntry } -func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) { +func NewPyramidSplitter(params *PyramidSplitterParams, tag *chunk.Tag) (pc *PyramidChunker) { pc = &PyramidChunker{} pc.reader = params.reader pc.hashSize = params.hashSize @@ -161,6 +162,7 @@ func NewPyramidSplitter(params *PyramidSplitterParams) (pc *PyramidChunker) { pc.putter = params.putter pc.getter = params.getter pc.key = params.addr + pc.tag = tag pc.workerCount = 0 pc.jobC = make(chan *chunkJob, 2*ChunkProcessors) pc.wg = &sync.WaitGroup{} @@ -273,6 +275,7 @@ func (pc *PyramidChunker) processor(ctx context.Context, id int64) { return } pc.processChunk(ctx, id, job) + pc.tag.Inc(chunk.StateSplit) case <-pc.quitC: return } diff --git a/swarm/swarm.go b/swarm/swarm.go index 2f025d9cc..d004bcd2f 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -211,9 +211,10 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e MaxPeerServers: config.MaxStreamPeerServers, } self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, self.stateStore, registryOptions, self.swap) + tags := chunk.NewTags() //todo load from state store // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage - self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams) + self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams, tags) log.Debug("Setup local storage") @@ -228,7 +229,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e pss.SetHandshakeController(self.ps, pss.NewHandshakeParams()) } - self.api = api.NewAPI(self.fileStore, self.dns, feedsHandler, self.privateKey) + self.api = api.NewAPI(self.fileStore, self.dns, feedsHandler, self.privateKey, tags) self.sfs = fuse.NewSwarmFS(self.api) log.Debug("Initialized FUSE filesystem") diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go index 2a5b28513..cf2afaec9 100644 --- a/swarm/swarm_test.go +++ b/swarm/swarm_test.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/sctx" ) // TestNewSwarm validates Swarm fields in repsect to the provided configuration. @@ -352,8 +353,11 @@ func testLocalStoreAndRetrieve(t *testing.T, swarm *Swarm, n int, randomData boo rand.Read(slice) } dataPut := string(slice) - - ctx := context.TODO() + tag, err := swarm.api.Tags.New("test-local-store-and-retrieve", 0) + if err != nil { + t.Fatal(err) + } + ctx := sctx.SetTag(context.Background(), tag.Uid) k, wait, err := swarm.api.Store(ctx, strings.NewReader(dataPut), int64(len(dataPut)), false) if err != nil { t.Fatal(err) diff --git a/swarm/testutil/tag.go b/swarm/testutil/tag.go new file mode 100644 index 000000000..d9908f11b --- /dev/null +++ b/swarm/testutil/tag.go @@ -0,0 +1,51 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package testutil + +import ( + "testing" + + "github.com/ethereum/go-ethereum/swarm/chunk" +) + +// CheckTag checks the first tag in the api struct to be in a certain state +func CheckTag(t *testing.T, tag *chunk.Tag, split, stored, seen, total int64) { + t.Helper() + if tag == nil { + t.Fatal("no tag found") + } + + tSplit := tag.Get(chunk.StateSplit) + if tSplit != split { + t.Fatalf("should have had split chunks, got %d want %d", tSplit, split) + } + + tSeen := tag.Get(chunk.StateSeen) + if tSeen != seen { + t.Fatalf("should have had seen chunks, got %d want %d", tSeen, seen) + } + + tStored := tag.Get(chunk.StateStored) + if tStored != stored { + t.Fatalf("mismatch stored chunks, got %d want %d", tStored, stored) + } + + tTotal := tag.Total() + if tTotal != total { + t.Fatalf("mismatch total chunks, got %d want %d", tTotal, total) + } +}