diff --git a/swarm/storage/localstore/doc.go b/swarm/storage/localstore/doc.go new file mode 100644 index 000000000..98f6fc40a --- /dev/null +++ b/swarm/storage/localstore/doc.go @@ -0,0 +1,56 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +/* +Package localstore provides disk storage layer for Swarm Chunk persistence. +It uses swarm/shed abstractions on top of github.com/syndtr/goleveldb LevelDB +implementation. + +The main type is DB which manages the storage by providing methods to +access and add Chunks and to manage their status. + +Modes are abstractions that do specific changes to Chunks. There are three +mode types: + + - ModeGet, for Chunk access + - ModePut, for adding Chunks to the database + - ModeSet, for changing Chunk statuses + +Every mode type has a corresponding type (Getter, Putter and Setter) +that provides adequate method to perform the opperation and that type +should be injected into localstore consumers instead the whole DB. +This provides more clear insight which operations consumer is performing +on the database. + +Getters, Putters and Setters accept different get, put and set modes +to perform different actions. For example, ModeGet has two different +variables ModeGetRequest and ModeGetSync and two different Getters +can be constructed with them that are used when the chunk is requested +or when the chunk is synced as this two events are differently changing +the database. + +Subscription methods are implemented for a specific purpose of +continuous iterations over Chunks that should be provided to +Push and Pull syncing. + +DB implements an internal garbage collector that removes only synced +Chunks from the database based on their most recent access time. + +Internally, DB stores Chunk data and any required information, such as +store and access timestamps in different shed indexes that can be +iterated on by garbage collector or subscriptions. +*/ +package localstore diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go new file mode 100644 index 000000000..7718d1e58 --- /dev/null +++ b/swarm/storage/localstore/gc.go @@ -0,0 +1,302 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +/* +Counting number of items in garbage collection index + +The number of items in garbage collection index is not the same as the number of +chunks in retrieval index (total number of stored chunks). Chunk can be garbage +collected only when it is set to a synced state by ModSetSync, and only then can +be counted into garbage collection size, which determines whether a number of +chunk should be removed from the storage by the garbage collection. This opens a +possibility that the storage size exceeds the limit if files are locally +uploaded and the node is not connected to other nodes or there is a problem with +syncing. + +Tracking of garbage collection size (gcSize) is focused on performance. Key +points: + + 1. counting the number of key/value pairs in LevelDB takes around 0.7s for 1e6 + on a very fast ssd (unacceptable long time in reality) + 2. locking leveldb batch writes with a global mutex (serial batch writes) is + not acceptable, we should use locking per chunk address + +Because of point 1. we cannot count the number of items in garbage collection +index in New constructor as it could last very long for realistic scenarios +where limit is 5e6 and nodes are running on slower hdd disks or cloud providers +with low IOPS. + +Point 2. is a performance optimization to allow parallel batch writes with +getters, putters and setters. Every single batch that they create contain only +information related to a single chunk, no relations with other chunks or shared +statistical data (like gcSize). This approach avoids race conditions on writing +batches in parallel, but creates a problem of synchronizing statistical data +values like gcSize. With global mutex lock, any data could be written by any +batch, but would not use utilize the full potential of leveldb parallel writes. + +To mitigate this two problems, the implementation of counting and persisting +gcSize is split into two parts. One is the in-memory value (gcSize) that is fast +to read and write with a dedicated mutex (gcSizeMu) if the batch which adds or +removes items from garbage collection index is successful. The second part is +the reliable persistence of this value to leveldb database, as storedGCSize +field. This database field is saved by writeGCSizeWorker and writeGCSize +functions when in-memory gcSize variable is changed, but no too often to avoid +very frequent database writes. This database writes are triggered by +writeGCSizeTrigger when a call is made to function incGCSize. Trigger ensures +that no database writes are done only when gcSize is changed (contrary to a +simpler periodic writes or checks). A backoff of 10s in writeGCSizeWorker +ensures that no frequent batch writes are made. Saving the storedGCSize on +database Close function ensures that in-memory gcSize is persisted when database +is closed. + +This persistence must be resilient to failures like panics. For this purpose, a +collection of hashes that are added to the garbage collection index, but still +not persisted to storedGCSize, must be tracked to count them in when DB is +constructed again with New function after the failure (swarm node restarts). On +every batch write that adds a new item to garbage collection index, the same +hash is added to gcUncountedHashesIndex. This ensures that there is a persisted +information which hashes were added to the garbage collection index. But, when +the storedGCSize is saved by writeGCSize function, this values are removed in +the same batch in which storedGCSize is changed to ensure consistency. When the +panic happen, or database Close method is not saved. The database storage +contains all information to reliably and efficiently get the correct number of +items in garbage collection index. This is performed in the New function when +all hashes in gcUncountedHashesIndex are counted, added to the storedGCSize and +saved to the disk before the database is constructed again. Index +gcUncountedHashesIndex is acting as dirty bit for recovery that provides +information what needs to be corrected. With a simple dirty bit, the whole +garbage collection index should me counted on recovery instead only the items in +gcUncountedHashesIndex. Because of the triggering mechanizm of writeGCSizeWorker +and relatively short backoff time, the number of hashes in +gcUncountedHashesIndex should be low and it should take a very short time to +recover from the previous failure. If there was no failure and +gcUncountedHashesIndex is empty, which is the usual case, New function will take +the minimal time to return. +*/ + +package localstore + +import ( + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/syndtr/goleveldb/leveldb" +) + +var ( + // gcTargetRatio defines the target number of items + // in garbage collection index that will not be removed + // on garbage collection. The target number of items + // is calculated by gcTarget function. This value must be + // in range (0,1]. For example, with 0.9 value, + // garbage collection will leave 90% of defined capacity + // in database after its run. This prevents frequent + // garbage collection runs. + gcTargetRatio = 0.9 + // gcBatchSize limits the number of chunks in a single + // leveldb batch on garbage collection. + gcBatchSize int64 = 1000 +) + +// collectGarbageWorker is a long running function that waits for +// collectGarbageTrigger channel to signal a garbage collection +// run. GC run iterates on gcIndex and removes older items +// form retrieval and other indexes. +func (db *DB) collectGarbageWorker() { + for { + select { + case <-db.collectGarbageTrigger: + // run a single collect garbage run and + // if done is false, gcBatchSize is reached and + // another collect garbage run is needed + collectedCount, done, err := db.collectGarbage() + if err != nil { + log.Error("localstore collect garbage", "err", err) + } + // check if another gc run is needed + if !done { + db.triggerGarbageCollection() + } + + if testHookCollectGarbage != nil { + testHookCollectGarbage(collectedCount) + } + case <-db.close: + return + } + } +} + +// collectGarbage removes chunks from retrieval and other +// indexes if maximal number of chunks in database is reached. +// This function returns the number of removed chunks. If done +// is false, another call to this function is needed to collect +// the rest of the garbage as the batch size limit is reached. +// This function is called in collectGarbageWorker. +func (db *DB) collectGarbage() (collectedCount int64, done bool, err error) { + batch := new(leveldb.Batch) + target := db.gcTarget() + + done = true + err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { + // protect parallel updates + unlock, err := db.lockAddr(item.Address) + if err != nil { + return false, err + } + defer unlock() + + gcSize := db.getGCSize() + if gcSize-collectedCount <= target { + return true, nil + } + // delete from retrieve, pull, gc + db.retrievalDataIndex.DeleteInBatch(batch, item) + db.retrievalAccessIndex.DeleteInBatch(batch, item) + db.pullIndex.DeleteInBatch(batch, item) + db.gcIndex.DeleteInBatch(batch, item) + collectedCount++ + if collectedCount >= gcBatchSize { + // bach size limit reached, + // another gc run is needed + done = false + return true, nil + } + return false, nil + }, nil) + if err != nil { + return 0, false, err + } + + err = db.shed.WriteBatch(batch) + if err != nil { + return 0, false, err + } + // batch is written, decrement gcSize + db.incGCSize(-collectedCount) + return collectedCount, done, nil +} + +// gcTrigger retruns the absolute value for garbage collection +// target value, calculated from db.capacity and gcTargetRatio. +func (db *DB) gcTarget() (target int64) { + return int64(float64(db.capacity) * gcTargetRatio) +} + +// incGCSize increments gcSize by the provided number. +// If count is negative, it will decrement gcSize. +func (db *DB) incGCSize(count int64) { + if count == 0 { + return + } + + db.gcSizeMu.Lock() + new := db.gcSize + count + db.gcSize = new + db.gcSizeMu.Unlock() + + select { + case db.writeGCSizeTrigger <- struct{}{}: + default: + } + if new >= db.capacity { + db.triggerGarbageCollection() + } +} + +// getGCSize returns gcSize value by locking it +// with gcSizeMu mutex. +func (db *DB) getGCSize() (count int64) { + db.gcSizeMu.RLock() + count = db.gcSize + db.gcSizeMu.RUnlock() + return count +} + +// triggerGarbageCollection signals collectGarbageWorker +// to call collectGarbage. +func (db *DB) triggerGarbageCollection() { + select { + case db.collectGarbageTrigger <- struct{}{}: + case <-db.close: + default: + } +} + +// writeGCSizeWorker writes gcSize on trigger event +// and waits writeGCSizeDelay after each write. +// It implements a linear backoff with delay of +// writeGCSizeDelay duration to avoid very frequent +// database operations. +func (db *DB) writeGCSizeWorker() { + for { + select { + case <-db.writeGCSizeTrigger: + err := db.writeGCSize(db.getGCSize()) + if err != nil { + log.Error("localstore write gc size", "err", err) + } + // Wait some time before writing gc size in the next + // iteration. This prevents frequent I/O operations. + select { + case <-time.After(10 * time.Second): + case <-db.close: + return + } + case <-db.close: + return + } + } +} + +// writeGCSize stores the number of items in gcIndex. +// It removes all hashes from gcUncountedHashesIndex +// not to include them on the next DB initialization +// (New function) when gcSize is counted. +func (db *DB) writeGCSize(gcSize int64) (err error) { + const maxBatchSize = 1000 + + batch := new(leveldb.Batch) + db.storedGCSize.PutInBatch(batch, uint64(gcSize)) + batchSize := 1 + + // use only one iterator as it acquires its snapshot + // not to remove hashes from index that are added + // after stored gc size is written + err = db.gcUncountedHashesIndex.Iterate(func(item shed.Item) (stop bool, err error) { + db.gcUncountedHashesIndex.DeleteInBatch(batch, item) + batchSize++ + if batchSize >= maxBatchSize { + err = db.shed.WriteBatch(batch) + if err != nil { + return false, err + } + batch.Reset() + batchSize = 0 + } + return false, nil + }, nil) + if err != nil { + return err + } + return db.shed.WriteBatch(batch) +} + +// testHookCollectGarbage is a hook that can provide +// information when a garbage collection run is done +// and how many items it removed. +var testHookCollectGarbage func(collectedCount int64) diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go new file mode 100644 index 000000000..eb039a554 --- /dev/null +++ b/swarm/storage/localstore/gc_test.go @@ -0,0 +1,358 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "io/ioutil" + "math/rand" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// TestDB_collectGarbageWorker tests garbage collection runs +// by uploading and syncing a number of chunks. +func TestDB_collectGarbageWorker(t *testing.T) { + testDB_collectGarbageWorker(t) +} + +// TestDB_collectGarbageWorker_multipleBatches tests garbage +// collection runs by uploading and syncing a number of +// chunks by having multiple smaller batches. +func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) { + // lower the maximal number of chunks in a single + // gc batch to ensure multiple batches. + defer func(s int64) { gcBatchSize = s }(gcBatchSize) + gcBatchSize = 2 + + testDB_collectGarbageWorker(t) +} + +// testDB_collectGarbageWorker is a helper test function to test +// garbage collection runs by uploading and syncing a number of chunks. +func testDB_collectGarbageWorker(t *testing.T) { + chunkCount := 150 + + testHookCollectGarbageChan := make(chan int64) + defer setTestHookCollectGarbage(func(collectedCount int64) { + testHookCollectGarbageChan <- collectedCount + })() + + db, cleanupFunc := newTestDB(t, &Options{ + Capacity: 100, + }) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + syncer := db.NewSetter(ModeSetSync) + + addrs := make([]storage.Address, 0) + + // upload random chunks + for i := 0; i < chunkCount; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + err = syncer.Set(chunk.Address()) + if err != nil { + t.Fatal(err) + } + + addrs = append(addrs, chunk.Address()) + } + + gcTarget := db.gcTarget() + + for { + select { + case <-testHookCollectGarbageChan: + case <-time.After(10 * time.Second): + t.Error("collect garbage timeout") + } + gcSize := db.getGCSize() + if gcSize == gcTarget { + break + } + } + + t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget))) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget))) + + t.Run("gc size", newIndexGCSizeTest(db)) + + // the first synced chunk should be removed + t.Run("get the first synced chunk", func(t *testing.T) { + _, err := db.NewGetter(ModeGetRequest).Get(addrs[0]) + if err != storage.ErrChunkNotFound { + t.Errorf("got error %v, want %v", err, storage.ErrChunkNotFound) + } + }) + + // last synced chunk should not be removed + t.Run("get most recent synced chunk", func(t *testing.T) { + _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1]) + if err != nil { + t.Fatal(err) + } + }) + + // cleanup: drain the last testHookCollectGarbageChan + // element before calling deferred functions not to block + // collectGarbageWorker loop, preventing the race in + // setting testHookCollectGarbage function + select { + case <-testHookCollectGarbageChan: + default: + } +} + +// TestDB_collectGarbageWorker_withRequests is a helper test function +// to test garbage collection runs by uploading, syncing and +// requesting a number of chunks. +func TestDB_collectGarbageWorker_withRequests(t *testing.T) { + db, cleanupFunc := newTestDB(t, &Options{ + Capacity: 100, + }) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + syncer := db.NewSetter(ModeSetSync) + + testHookCollectGarbageChan := make(chan int64) + defer setTestHookCollectGarbage(func(collectedCount int64) { + testHookCollectGarbageChan <- collectedCount + })() + + addrs := make([]storage.Address, 0) + + // upload random chunks just up to the capacity + for i := 0; i < int(db.capacity)-1; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + err = syncer.Set(chunk.Address()) + if err != nil { + t.Fatal(err) + } + + addrs = append(addrs, chunk.Address()) + } + + // request the latest synced chunk + // to prioritize it in the gc index + // not to be collected + _, err := db.NewGetter(ModeGetRequest).Get(addrs[0]) + if err != nil { + t.Fatal(err) + } + + // upload and sync another chunk to trigger + // garbage collection + chunk := generateRandomChunk() + err = uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + err = syncer.Set(chunk.Address()) + if err != nil { + t.Fatal(err) + } + addrs = append(addrs, chunk.Address()) + + // wait for garbage collection + + gcTarget := db.gcTarget() + + var totalCollectedCount int64 + for { + select { + case c := <-testHookCollectGarbageChan: + totalCollectedCount += c + case <-time.After(10 * time.Second): + t.Error("collect garbage timeout") + } + gcSize := db.getGCSize() + if gcSize == gcTarget { + break + } + } + + wantTotalCollectedCount := int64(len(addrs)) - gcTarget + if totalCollectedCount != wantTotalCollectedCount { + t.Errorf("total collected chunks %v, want %v", totalCollectedCount, wantTotalCollectedCount) + } + + t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget))) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget))) + + t.Run("gc size", newIndexGCSizeTest(db)) + + // requested chunk should not be removed + t.Run("get requested chunk", func(t *testing.T) { + _, err := db.NewGetter(ModeGetRequest).Get(addrs[0]) + if err != nil { + t.Fatal(err) + } + }) + + // the second synced chunk should be removed + t.Run("get gc-ed chunk", func(t *testing.T) { + _, err := db.NewGetter(ModeGetRequest).Get(addrs[1]) + if err != storage.ErrChunkNotFound { + t.Errorf("got error %v, want %v", err, storage.ErrChunkNotFound) + } + }) + + // last synced chunk should not be removed + t.Run("get most recent synced chunk", func(t *testing.T) { + _, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1]) + if err != nil { + t.Fatal(err) + } + }) +} + +// TestDB_gcSize checks if gcSize has a correct value after +// database is initialized with existing data. +func TestDB_gcSize(t *testing.T) { + dir, err := ioutil.TempDir("", "localstore-stored-gc-size") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + baseKey := make([]byte, 32) + if _, err := rand.Read(baseKey); err != nil { + t.Fatal(err) + } + db, err := New(dir, baseKey, nil) + if err != nil { + t.Fatal(err) + } + + uploader := db.NewPutter(ModePutUpload) + syncer := db.NewSetter(ModeSetSync) + + count := 100 + + for i := 0; i < count; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + err = syncer.Set(chunk.Address()) + if err != nil { + t.Fatal(err) + } + } + + // DB.Close writes gc size to disk, so + // Instead calling Close, simulate database shutdown + // without it. + close(db.close) + db.updateGCWG.Wait() + err = db.shed.Close() + if err != nil { + t.Fatal(err) + } + + db, err = New(dir, baseKey, nil) + if err != nil { + t.Fatal(err) + } + + t.Run("gc index size", newIndexGCSizeTest(db)) + + t.Run("gc uncounted hashes index count", newItemsCountTest(db.gcUncountedHashesIndex, 0)) +} + +// setTestHookCollectGarbage sets testHookCollectGarbage and +// returns a function that will reset it to the +// value before the change. +func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) { + current := testHookCollectGarbage + reset = func() { testHookCollectGarbage = current } + testHookCollectGarbage = h + return reset +} + +// TestSetTestHookCollectGarbage tests if setTestHookCollectGarbage changes +// testHookCollectGarbage function correctly and if its reset function +// resets the original function. +func TestSetTestHookCollectGarbage(t *testing.T) { + // Set the current function after the test finishes. + defer func(h func(collectedCount int64)) { testHookCollectGarbage = h }(testHookCollectGarbage) + + // expected value for the unchanged function + original := 1 + // expected value for the changed function + changed := 2 + + // this variable will be set with two different functions + var got int + + // define the original (unchanged) functions + testHookCollectGarbage = func(_ int64) { + got = original + } + + // set got variable + testHookCollectGarbage(0) + + // test if got variable is set correctly + if got != original { + t.Errorf("got hook value %v, want %v", got, original) + } + + // set the new function + reset := setTestHookCollectGarbage(func(_ int64) { + got = changed + }) + + // set got variable + testHookCollectGarbage(0) + + // test if got variable is set correctly to changed value + if got != changed { + t.Errorf("got hook value %v, want %v", got, changed) + } + + // set the function to the original one + reset() + + // set got variable + testHookCollectGarbage(0) + + // test if got variable is set correctly to original value + if got != original { + t.Errorf("got hook value %v, want %v", got, original) + } +} diff --git a/swarm/storage/localstore/index_test.go b/swarm/storage/localstore/index_test.go new file mode 100644 index 000000000..d9abf440f --- /dev/null +++ b/swarm/storage/localstore/index_test.go @@ -0,0 +1,227 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "bytes" + "math/rand" + "testing" + + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// TestDB_pullIndex validates the ordering of keys in pull index. +// Pull index key contains PO prefix which is calculated from +// DB base key and chunk address. This is not an Item field +// which are checked in Mode tests. +// This test uploads chunks, sorts them in expected order and +// validates that pull index iterator will iterate it the same +// order. +func TestDB_pullIndex(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + chunkCount := 50 + + chunks := make([]testIndexChunk, chunkCount) + + // upload random chunks + for i := 0; i < chunkCount; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + chunks[i] = testIndexChunk{ + Chunk: chunk, + // this timestamp is not the same as in + // the index, but given that uploads + // are sequential and that only ordering + // of events matter, this information is + // sufficient + storeTimestamp: now(), + } + } + + testItemsOrder(t, db.pullIndex, chunks, func(i, j int) (less bool) { + poi := storage.Proximity(db.baseKey, chunks[i].Address()) + poj := storage.Proximity(db.baseKey, chunks[j].Address()) + if poi < poj { + return true + } + if poi > poj { + return false + } + if chunks[i].storeTimestamp < chunks[j].storeTimestamp { + return true + } + if chunks[i].storeTimestamp > chunks[j].storeTimestamp { + return false + } + return bytes.Compare(chunks[i].Address(), chunks[j].Address()) == -1 + }) +} + +// TestDB_gcIndex validates garbage collection index by uploading +// a chunk with and performing operations using synced, access and +// request modes. +func TestDB_gcIndex(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + chunkCount := 50 + + chunks := make([]testIndexChunk, chunkCount) + + // upload random chunks + for i := 0; i < chunkCount; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + chunks[i] = testIndexChunk{ + Chunk: chunk, + } + } + + // check if all chunks are stored + newItemsCountTest(db.pullIndex, chunkCount)(t) + + // check that chunks are not collectable for garbage + newItemsCountTest(db.gcIndex, 0)(t) + + // set update gc test hook to signal when + // update gc goroutine is done by sending to + // testHookUpdateGCChan channel, which is + // used to wait for indexes change verifications + testHookUpdateGCChan := make(chan struct{}) + defer setTestHookUpdateGC(func() { + testHookUpdateGCChan <- struct{}{} + })() + + t.Run("request unsynced", func(t *testing.T) { + chunk := chunks[1] + + _, err := db.NewGetter(ModeGetRequest).Get(chunk.Address()) + if err != nil { + t.Fatal(err) + } + // wait for update gc goroutine to be done + <-testHookUpdateGCChan + + // the chunk is not synced + // should not be in the garbace collection index + newItemsCountTest(db.gcIndex, 0)(t) + + newIndexGCSizeTest(db)(t) + }) + + t.Run("sync one chunk", func(t *testing.T) { + chunk := chunks[0] + + err := db.NewSetter(ModeSetSync).Set(chunk.Address()) + if err != nil { + t.Fatal(err) + } + + // the chunk is synced and should be in gc index + newItemsCountTest(db.gcIndex, 1)(t) + + newIndexGCSizeTest(db)(t) + }) + + t.Run("sync all chunks", func(t *testing.T) { + setter := db.NewSetter(ModeSetSync) + + for i := range chunks { + err := setter.Set(chunks[i].Address()) + if err != nil { + t.Fatal(err) + } + } + + testItemsOrder(t, db.gcIndex, chunks, nil) + + newIndexGCSizeTest(db)(t) + }) + + t.Run("request one chunk", func(t *testing.T) { + i := 6 + + _, err := db.NewGetter(ModeGetRequest).Get(chunks[i].Address()) + if err != nil { + t.Fatal(err) + } + // wait for update gc goroutine to be done + <-testHookUpdateGCChan + + // move the chunk to the end of the expected gc + c := chunks[i] + chunks = append(chunks[:i], chunks[i+1:]...) + chunks = append(chunks, c) + + testItemsOrder(t, db.gcIndex, chunks, nil) + + newIndexGCSizeTest(db)(t) + }) + + t.Run("random chunk request", func(t *testing.T) { + requester := db.NewGetter(ModeGetRequest) + + rand.Shuffle(len(chunks), func(i, j int) { + chunks[i], chunks[j] = chunks[j], chunks[i] + }) + + for _, chunk := range chunks { + _, err := requester.Get(chunk.Address()) + if err != nil { + t.Fatal(err) + } + // wait for update gc goroutine to be done + <-testHookUpdateGCChan + } + + testItemsOrder(t, db.gcIndex, chunks, nil) + + newIndexGCSizeTest(db)(t) + }) + + t.Run("remove one chunk", func(t *testing.T) { + i := 3 + + err := db.NewSetter(modeSetRemove).Set(chunks[i].Address()) + if err != nil { + t.Fatal(err) + } + + // remove the chunk from the expected chunks in gc index + chunks = append(chunks[:i], chunks[i+1:]...) + + testItemsOrder(t, db.gcIndex, chunks, nil) + + newIndexGCSizeTest(db)(t) + }) +} diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go new file mode 100644 index 000000000..7a9fb54f5 --- /dev/null +++ b/swarm/storage/localstore/localstore.go @@ -0,0 +1,431 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "encoding/binary" + "encoding/hex" + "errors" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/storage/mock" +) + +var ( + // ErrInvalidMode is retuned when an unknown Mode + // is provided to the function. + ErrInvalidMode = errors.New("invalid mode") + // ErrAddressLockTimeout is returned when the same chunk + // is updated in parallel and one of the updates + // takes longer then the configured timeout duration. + ErrAddressLockTimeout = errors.New("address lock timeout") +) + +var ( + // Default value for Capacity DB option. + defaultCapacity int64 = 5000000 + // Limit the number of goroutines created by Getters + // that call updateGC function. Value 0 sets no limit. + maxParallelUpdateGC = 1000 +) + +// DB is the local store implementation and holds +// database related objects. +type DB struct { + shed *shed.DB + + // schema name of loaded data + schemaName shed.StringField + // field that stores number of intems in gc index + storedGCSize shed.Uint64Field + + // retrieval indexes + retrievalDataIndex shed.Index + retrievalAccessIndex shed.Index + // push syncing index + pushIndex shed.Index + // push syncing subscriptions triggers + pushTriggers []chan struct{} + pushTriggersMu sync.RWMutex + + // pull syncing index + pullIndex shed.Index + // pull syncing subscriptions triggers per bin + pullTriggers map[uint8][]chan struct{} + pullTriggersMu sync.RWMutex + + // garbage collection index + gcIndex shed.Index + // index that stores hashes that are not + // counted in and saved to storedGCSize + gcUncountedHashesIndex shed.Index + + // number of elements in garbage collection index + // it must be always read by getGCSize and + // set with incGCSize which are locking gcSizeMu + gcSize int64 + gcSizeMu sync.RWMutex + // garbage collection is triggered when gcSize exceeds + // the capacity value + capacity int64 + + // triggers garbage collection event loop + collectGarbageTrigger chan struct{} + // triggers write gc size event loop + writeGCSizeTrigger chan struct{} + + // a buffered channel acting as a semaphore + // to limit the maximal number of goroutines + // created by Getters to call updateGC function + updateGCSem chan struct{} + // a wait group to ensure all updateGC goroutines + // are done before closing the database + updateGCWG sync.WaitGroup + + baseKey []byte + + addressLocks sync.Map + + // this channel is closed when close function is called + // to terminate other goroutines + close chan struct{} +} + +// Options struct holds optional parameters for configuring DB. +type Options struct { + // MockStore is a mock node store that is used to store + // chunk data in a central store. It can be used to reduce + // total storage space requirements in testing large number + // of swarm nodes with chunk data deduplication provided by + // the mock global store. + MockStore *mock.NodeStore + // Capacity is a limit that triggers garbage collection when + // number of items in gcIndex equals or exceeds it. + Capacity int64 + // MetricsPrefix defines a prefix for metrics names. + MetricsPrefix string +} + +// New returns a new DB. All fields and indexes are initialized +// and possible conflicts with schema from existing database is checked. +// One goroutine for writing batches is created. +func New(path string, baseKey []byte, o *Options) (db *DB, err error) { + if o == nil { + o = new(Options) + } + db = &DB{ + capacity: o.Capacity, + baseKey: baseKey, + // channels collectGarbageTrigger and writeGCSizeTrigger + // need to be buffered with the size of 1 + // to signal another event if it + // is triggered during already running function + collectGarbageTrigger: make(chan struct{}, 1), + writeGCSizeTrigger: make(chan struct{}, 1), + close: make(chan struct{}), + } + if db.capacity <= 0 { + db.capacity = defaultCapacity + } + if maxParallelUpdateGC > 0 { + db.updateGCSem = make(chan struct{}, maxParallelUpdateGC) + } + + db.shed, err = shed.NewDB(path, o.MetricsPrefix) + if err != nil { + return nil, err + } + // Identify current storage schema by arbitrary name. + db.schemaName, err = db.shed.NewStringField("schema-name") + if err != nil { + return nil, err + } + // Persist gc size. + db.storedGCSize, err = db.shed.NewUint64Field("gc-size") + if err != nil { + return nil, err + } + // Functions for retrieval data index. + var ( + encodeValueFunc func(fields shed.Item) (value []byte, err error) + decodeValueFunc func(keyItem shed.Item, value []byte) (e shed.Item, err error) + ) + if o.MockStore != nil { + encodeValueFunc = func(fields shed.Item) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) + err = o.MockStore.Put(fields.Address, fields.Data) + if err != nil { + return nil, err + } + return b, nil + } + decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.Data, err = o.MockStore.Get(keyItem.Address) + return e, err + } + } else { + encodeValueFunc = func(fields shed.Item) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) + value = append(b, fields.Data...) + return value, nil + } + decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.Data = value[8:] + return e, nil + } + } + // Index storing actual chunk address, data and store timestamp. + db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: encodeValueFunc, + DecodeValue: decodeValueFunc, + }) + if err != nil { + return nil, err + } + // Index storing access timestamp for a particular address. + // It is needed in order to update gc index keys for iteration order. + db.retrievalAccessIndex, err = db.shed.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp)) + return b, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value)) + return e, nil + }, + }) + if err != nil { + return nil, err + } + // pull index allows history and live syncing per po bin + db.pullIndex, err = db.shed.NewIndex("PO|StoredTimestamp|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + key = make([]byte, 41) + key[0] = db.po(fields.Address) + binary.BigEndian.PutUint64(key[1:9], uint64(fields.StoreTimestamp)) + copy(key[9:], fields.Address[:]) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key[9:] + e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[1:9])) + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + // create a pull syncing triggers used by SubscribePull function + db.pullTriggers = make(map[uint8][]chan struct{}) + // push index contains as yet unsynced chunks + db.pushIndex, err = db.shed.NewIndex("StoredTimestamp|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + key = make([]byte, 40) + binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp)) + copy(key[8:], fields.Address[:]) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key[8:] + e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + // create a push syncing triggers used by SubscribePush function + db.pushTriggers = make([]chan struct{}, 0) + // gc index for removable chunk ordered by ascending last access time + db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|StoredTimestamp|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + b := make([]byte, 16, 16+len(fields.Address)) + binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp)) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + key = append(b, fields.Address...) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16])) + e.Address = key[16:] + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + // gc uncounted hashes index keeps hashes that are in gc index + // but not counted in and saved to storedGCSize + db.gcUncountedHashesIndex, err = db.shed.NewIndex("Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + + // count number of elements in garbage collection index + gcSize, err := db.storedGCSize.Get() + if err != nil { + return nil, err + } + // get number of uncounted hashes + gcUncountedSize, err := db.gcUncountedHashesIndex.Count() + if err != nil { + return nil, err + } + gcSize += uint64(gcUncountedSize) + // remove uncounted hashes from the index and + // save the total gcSize after uncounted hashes are removed + err = db.writeGCSize(int64(gcSize)) + if err != nil { + return nil, err + } + db.incGCSize(int64(gcSize)) + + // start worker to write gc size + go db.writeGCSizeWorker() + // start garbage collection worker + go db.collectGarbageWorker() + return db, nil +} + +// Close closes the underlying database. +func (db *DB) Close() (err error) { + close(db.close) + db.updateGCWG.Wait() + if err := db.writeGCSize(db.getGCSize()); err != nil { + log.Error("localstore: write gc size", "err", err) + } + return db.shed.Close() +} + +// po computes the proximity order between the address +// and database base key. +func (db *DB) po(addr storage.Address) (bin uint8) { + return uint8(storage.Proximity(db.baseKey, addr)) +} + +var ( + // Maximal time for lockAddr to wait until it + // returns error. + addressLockTimeout = 3 * time.Second + // duration between two lock checks in lockAddr. + addressLockCheckDelay = 30 * time.Microsecond +) + +// lockAddr sets the lock on a particular address +// using addressLocks sync.Map and returns unlock function. +// If the address is locked this function will check it +// in a for loop for addressLockTimeout time, after which +// it will return ErrAddressLockTimeout error. +func (db *DB) lockAddr(addr storage.Address) (unlock func(), err error) { + start := time.Now() + lockKey := hex.EncodeToString(addr) + for { + _, loaded := db.addressLocks.LoadOrStore(lockKey, struct{}{}) + if !loaded { + break + } + time.Sleep(addressLockCheckDelay) + if time.Since(start) > addressLockTimeout { + return nil, ErrAddressLockTimeout + } + } + return func() { db.addressLocks.Delete(lockKey) }, nil +} + +// chunkToItem creates new Item with data provided by the Chunk. +func chunkToItem(ch storage.Chunk) shed.Item { + return shed.Item{ + Address: ch.Address(), + Data: ch.Data(), + } +} + +// addressToItem creates new Item with a provided address. +func addressToItem(addr storage.Address) shed.Item { + return shed.Item{ + Address: addr, + } +} + +// now is a helper function that returns a current unix timestamp +// in UTC timezone. +// It is set in the init function for usage in production, and +// optionally overridden in tests for data validation. +var now func() int64 + +func init() { + // set the now function + now = func() (t int64) { + return time.Now().UTC().UnixNano() + } +} diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go new file mode 100644 index 000000000..c7309d3cd --- /dev/null +++ b/swarm/storage/localstore/localstore_test.go @@ -0,0 +1,520 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "os" + "sort" + "strconv" + "sync" + "testing" + "time" + + ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" +) + +// TestDB validates if the chunk can be uploaded and +// correctly retrieved. +func TestDB(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + chunk := generateRandomChunk() + + err := db.NewPutter(ModePutUpload).Put(chunk) + if err != nil { + t.Fatal(err) + } + + got, err := db.NewGetter(ModeGetRequest).Get(chunk.Address()) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(got.Address(), chunk.Address()) { + t.Errorf("got address %x, want %x", got.Address(), chunk.Address()) + } + if !bytes.Equal(got.Data(), chunk.Data()) { + t.Errorf("got data %x, want %x", got.Data(), chunk.Data()) + } +} + +// TestDB_updateGCSem tests maxParallelUpdateGC limit. +// This test temporary sets the limit to a low number, +// makes updateGC function execution time longer by +// setting a custom testHookUpdateGC function with a sleep +// and a count current and maximal number of goroutines. +func TestDB_updateGCSem(t *testing.T) { + updateGCSleep := time.Second + var count int + var max int + var mu sync.Mutex + defer setTestHookUpdateGC(func() { + mu.Lock() + // add to the count of current goroutines + count++ + if count > max { + // set maximal detected numbers of goroutines + max = count + } + mu.Unlock() + + // wait for some time to ensure multiple parallel goroutines + time.Sleep(updateGCSleep) + + mu.Lock() + count-- + mu.Unlock() + })() + + defer func(m int) { maxParallelUpdateGC = m }(maxParallelUpdateGC) + maxParallelUpdateGC = 3 + + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + chunk := generateRandomChunk() + + err := db.NewPutter(ModePutUpload).Put(chunk) + if err != nil { + t.Fatal(err) + } + + getter := db.NewGetter(ModeGetRequest) + + // get more chunks then maxParallelUpdateGC + // in time shorter then updateGCSleep + for i := 0; i < 5; i++ { + _, err = getter.Get(chunk.Address()) + if err != nil { + t.Fatal(err) + } + } + + if max != maxParallelUpdateGC { + t.Errorf("got max %v, want %v", max, maxParallelUpdateGC) + } +} + +// BenchmarkNew measures the time that New function +// needs to initialize and count the number of key/value +// pairs in GC index. +// This benchmark generates a number of chunks, uploads them, +// sets them to synced state for them to enter the GC index, +// and measures the execution time of New function by creating +// new databases with the same data directory. +// +// This benchmark takes significant amount of time. +// +// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014) show +// that New function executes around 1s for database with 1M chunks. +// +// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkNew -v -timeout 20m +// goos: darwin +// goarch: amd64 +// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore +// BenchmarkNew/1000-8 200 11672414 ns/op 9570960 B/op 10008 allocs/op +// BenchmarkNew/10000-8 100 14890609 ns/op 10490118 B/op 7759 allocs/op +// BenchmarkNew/100000-8 20 58334080 ns/op 17763157 B/op 22978 allocs/op +// BenchmarkNew/1000000-8 2 748595153 ns/op 45297404 B/op 253242 allocs/op +// PASS +func BenchmarkNew(b *testing.B) { + if testing.Short() { + b.Skip("skipping benchmark in short mode") + } + for _, count := range []int{ + 1000, + 10000, + 100000, + 1000000, + } { + b.Run(strconv.Itoa(count), func(b *testing.B) { + dir, err := ioutil.TempDir("", "localstore-new-benchmark") + if err != nil { + b.Fatal(err) + } + defer os.RemoveAll(dir) + baseKey := make([]byte, 32) + if _, err := rand.Read(baseKey); err != nil { + b.Fatal(err) + } + db, err := New(dir, baseKey, nil) + if err != nil { + b.Fatal(err) + } + uploader := db.NewPutter(ModePutUpload) + syncer := db.NewSetter(ModeSetSync) + for i := 0; i < count; i++ { + chunk := generateFakeRandomChunk() + err := uploader.Put(chunk) + if err != nil { + b.Fatal(err) + } + err = syncer.Set(chunk.Address()) + if err != nil { + b.Fatal(err) + } + } + err = db.Close() + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + + for n := 0; n < b.N; n++ { + b.StartTimer() + db, err := New(dir, baseKey, nil) + b.StopTimer() + + if err != nil { + b.Fatal(err) + } + err = db.Close() + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +// newTestDB is a helper function that constructs a +// temporary database and returns a cleanup function that must +// be called to remove the data. +func newTestDB(t testing.TB, o *Options) (db *DB, cleanupFunc func()) { + t.Helper() + + dir, err := ioutil.TempDir("", "localstore-test") + if err != nil { + t.Fatal(err) + } + cleanupFunc = func() { os.RemoveAll(dir) } + baseKey := make([]byte, 32) + if _, err := rand.Read(baseKey); err != nil { + t.Fatal(err) + } + db, err = New(dir, baseKey, o) + if err != nil { + cleanupFunc() + t.Fatal(err) + } + cleanupFunc = func() { + err := db.Close() + if err != nil { + t.Error(err) + } + os.RemoveAll(dir) + } + return db, cleanupFunc +} + +// generateRandomChunk generates a valid Chunk with +// data size of default chunk size. +func generateRandomChunk() storage.Chunk { + return storage.GenerateRandomChunk(ch.DefaultSize) +} + +func init() { + // needed for generateFakeRandomChunk + rand.Seed(time.Now().UnixNano()) +} + +// generateFakeRandomChunk generates a Chunk that is not +// valid, but it contains a random key and a random value. +// This function is faster then storage.GenerateRandomChunk +// which generates a valid chunk. +// Some tests in this package do not need valid chunks, just +// random data, and their execution time can be decreased +// using this function. +func generateFakeRandomChunk() storage.Chunk { + data := make([]byte, ch.DefaultSize) + rand.Read(data) + key := make([]byte, 32) + rand.Read(key) + return storage.NewChunk(key, data) +} + +// TestGenerateFakeRandomChunk validates that +// generateFakeRandomChunk returns random data by comparing +// two generated chunks. +func TestGenerateFakeRandomChunk(t *testing.T) { + c1 := generateFakeRandomChunk() + c2 := generateFakeRandomChunk() + addrLen := len(c1.Address()) + if addrLen != 32 { + t.Errorf("first chunk address length %v, want %v", addrLen, 32) + } + dataLen := len(c1.Data()) + if dataLen != ch.DefaultSize { + t.Errorf("first chunk data length %v, want %v", dataLen, ch.DefaultSize) + } + addrLen = len(c2.Address()) + if addrLen != 32 { + t.Errorf("second chunk address length %v, want %v", addrLen, 32) + } + dataLen = len(c2.Data()) + if dataLen != ch.DefaultSize { + t.Errorf("second chunk data length %v, want %v", dataLen, ch.DefaultSize) + } + if bytes.Equal(c1.Address(), c2.Address()) { + t.Error("fake chunks addresses do not differ") + } + if bytes.Equal(c1.Data(), c2.Data()) { + t.Error("fake chunks data bytes do not differ") + } +} + +// newRetrieveIndexesTest returns a test function that validates if the right +// chunk values are in the retrieval indexes. +func newRetrieveIndexesTest(db *DB, chunk storage.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { + return func(t *testing.T) { + item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address())) + if err != nil { + t.Fatal(err) + } + validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0) + + // access index should not be set + wantErr := leveldb.ErrNotFound + item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address())) + if err != wantErr { + t.Errorf("got error %v, want %v", err, wantErr) + } + } +} + +// newRetrieveIndexesTestWithAccess returns a test function that validates if the right +// chunk values are in the retrieval indexes when access time must be stored. +func newRetrieveIndexesTestWithAccess(db *DB, chunk storage.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { + return func(t *testing.T) { + item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address())) + if err != nil { + t.Fatal(err) + } + validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0) + + if accessTimestamp > 0 { + item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address())) + if err != nil { + t.Fatal(err) + } + validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp) + } + } +} + +// newPullIndexTest returns a test function that validates if the right +// chunk values are in the pull index. +func newPullIndexTest(db *DB, chunk storage.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) { + return func(t *testing.T) { + item, err := db.pullIndex.Get(shed.Item{ + Address: chunk.Address(), + StoreTimestamp: storeTimestamp, + }) + if err != wantError { + t.Errorf("got error %v, want %v", err, wantError) + } + if err == nil { + validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0) + } + } +} + +// newPushIndexTest returns a test function that validates if the right +// chunk values are in the push index. +func newPushIndexTest(db *DB, chunk storage.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) { + return func(t *testing.T) { + item, err := db.pushIndex.Get(shed.Item{ + Address: chunk.Address(), + StoreTimestamp: storeTimestamp, + }) + if err != wantError { + t.Errorf("got error %v, want %v", err, wantError) + } + if err == nil { + validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0) + } + } +} + +// newGCIndexTest returns a test function that validates if the right +// chunk values are in the push index. +func newGCIndexTest(db *DB, chunk storage.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { + return func(t *testing.T) { + item, err := db.gcIndex.Get(shed.Item{ + Address: chunk.Address(), + StoreTimestamp: storeTimestamp, + AccessTimestamp: accessTimestamp, + }) + if err != nil { + t.Fatal(err) + } + validateItem(t, item, chunk.Address(), nil, storeTimestamp, accessTimestamp) + } +} + +// newItemsCountTest returns a test function that validates if +// an index contains expected number of key/value pairs. +func newItemsCountTest(i shed.Index, want int) func(t *testing.T) { + return func(t *testing.T) { + var c int + err := i.Iterate(func(item shed.Item) (stop bool, err error) { + c++ + return + }, nil) + if err != nil { + t.Fatal(err) + } + if c != want { + t.Errorf("got %v items in index, want %v", c, want) + } + } +} + +// newIndexGCSizeTest retruns a test function that validates if DB.gcSize +// value is the same as the number of items in DB.gcIndex. +func newIndexGCSizeTest(db *DB) func(t *testing.T) { + return func(t *testing.T) { + var want int64 + err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { + want++ + return + }, nil) + if err != nil { + t.Fatal(err) + } + got := db.getGCSize() + if got != want { + t.Errorf("got gc size %v, want %v", got, want) + } + } +} + +// testIndexChunk embeds storageChunk with additional data that is stored +// in database. It is used for index values validations. +type testIndexChunk struct { + storage.Chunk + storeTimestamp int64 +} + +// testItemsOrder tests the order of chunks in the index. If sortFunc is not nil, +// chunks will be sorted with it before validation. +func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFunc func(i, j int) (less bool)) { + newItemsCountTest(i, len(chunks))(t) + + if sortFunc != nil { + sort.Slice(chunks, sortFunc) + } + + var cursor int + err := i.Iterate(func(item shed.Item) (stop bool, err error) { + want := chunks[cursor].Address() + got := item.Address + if !bytes.Equal(got, want) { + return true, fmt.Errorf("got address %x at position %v, want %x", got, cursor, want) + } + cursor++ + return false, nil + }, nil) + if err != nil { + t.Fatal(err) + } +} + +// validateItem is a helper function that checks Item values. +func validateItem(t *testing.T, item shed.Item, address, data []byte, storeTimestamp, accessTimestamp int64) { + t.Helper() + + if !bytes.Equal(item.Address, address) { + t.Errorf("got item address %x, want %x", item.Address, address) + } + if !bytes.Equal(item.Data, data) { + t.Errorf("got item data %x, want %x", item.Data, data) + } + if item.StoreTimestamp != storeTimestamp { + t.Errorf("got item store timestamp %v, want %v", item.StoreTimestamp, storeTimestamp) + } + if item.AccessTimestamp != accessTimestamp { + t.Errorf("got item access timestamp %v, want %v", item.AccessTimestamp, accessTimestamp) + } +} + +// setNow replaces now function and +// returns a function that will reset it to the +// value before the change. +func setNow(f func() int64) (reset func()) { + current := now + reset = func() { now = current } + now = f + return reset +} + +// TestSetNow tests if setNow function changes now function +// correctly and if its reset function resets the original function. +func TestSetNow(t *testing.T) { + // set the current function after the test finishes + defer func(f func() int64) { now = f }(now) + + // expected value for the unchanged function + var original int64 = 1 + // expected value for the changed function + var changed int64 = 2 + + // define the original (unchanged) functions + now = func() int64 { + return original + } + + // get the time + got := now() + + // test if got variable is set correctly + if got != original { + t.Errorf("got now value %v, want %v", got, original) + } + + // set the new function + reset := setNow(func() int64 { + return changed + }) + + // get the time + got = now() + + // test if got variable is set correctly to changed value + if got != changed { + t.Errorf("got hook value %v, want %v", got, changed) + } + + // set the function to the original one + reset() + + // get the time + got = now() + + // test if got variable is set correctly to original value + if got != original { + t.Errorf("got hook value %v, want %v", got, original) + } +} diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go new file mode 100644 index 000000000..3a69f6e9d --- /dev/null +++ b/swarm/storage/localstore/mode_get.go @@ -0,0 +1,154 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" +) + +// ModeGet enumerates different Getter modes. +type ModeGet int + +// Getter modes. +const ( + // ModeGetRequest: when accessed for retrieval + ModeGetRequest ModeGet = iota + // ModeGetSync: when accessed for syncing or proof of custody request + ModeGetSync +) + +// Getter provides Get method to retrieve Chunks +// from database. +type Getter struct { + db *DB + mode ModeGet +} + +// NewGetter returns a new Getter on database +// with a specific Mode. +func (db *DB) NewGetter(mode ModeGet) *Getter { + return &Getter{ + mode: mode, + db: db, + } +} + +// Get returns a chunk from the database. If the chunk is +// not found storage.ErrChunkNotFound will be returned. +// All required indexes will be updated required by the +// Getter Mode. +func (g *Getter) Get(addr storage.Address) (chunk storage.Chunk, err error) { + out, err := g.db.get(g.mode, addr) + if err != nil { + if err == leveldb.ErrNotFound { + return nil, storage.ErrChunkNotFound + } + return nil, err + } + return storage.NewChunk(out.Address, out.Data), nil +} + +// get returns Item from the retrieval index +// and updates other indexes. +func (db *DB) get(mode ModeGet, addr storage.Address) (out shed.Item, err error) { + item := addressToItem(addr) + + out, err = db.retrievalDataIndex.Get(item) + if err != nil { + return out, err + } + switch mode { + // update the access timestamp and gc index + case ModeGetRequest: + if db.updateGCSem != nil { + // wait before creating new goroutines + // if updateGCSem buffer id full + db.updateGCSem <- struct{}{} + } + db.updateGCWG.Add(1) + go func() { + defer db.updateGCWG.Done() + if db.updateGCSem != nil { + // free a spot in updateGCSem buffer + // for a new goroutine + defer func() { <-db.updateGCSem }() + } + err := db.updateGC(out) + if err != nil { + log.Error("localstore update gc", "err", err) + } + // if gc update hook is defined, call it + if testHookUpdateGC != nil { + testHookUpdateGC() + } + }() + + // no updates to indexes + case ModeGetSync: + default: + return out, ErrInvalidMode + } + return out, nil +} + +// updateGC updates garbage collection index for +// a single item. Provided item is expected to have +// only Address and Data fields with non zero values, +// which is ensured by the get function. +func (db *DB) updateGC(item shed.Item) (err error) { + unlock, err := db.lockAddr(item.Address) + if err != nil { + return err + } + defer unlock() + + batch := new(leveldb.Batch) + + // update accessTimeStamp in retrieve, gc + + i, err := db.retrievalAccessIndex.Get(item) + switch err { + case nil: + item.AccessTimestamp = i.AccessTimestamp + case leveldb.ErrNotFound: + // no chunk accesses + default: + return err + } + if item.AccessTimestamp == 0 { + // chunk is not yet synced + // do not add it to the gc index + return nil + } + // delete current entry from the gc index + db.gcIndex.DeleteInBatch(batch, item) + // update access timestamp + item.AccessTimestamp = now() + // update retrieve access index + db.retrievalAccessIndex.PutInBatch(batch, item) + // add new entry to gc index + db.gcIndex.PutInBatch(batch, item) + + return db.shed.WriteBatch(batch) +} + +// testHookUpdateGC is a hook that can provide +// information when a garbage collection index is updated. +var testHookUpdateGC func() diff --git a/swarm/storage/localstore/mode_get_test.go b/swarm/storage/localstore/mode_get_test.go new file mode 100644 index 000000000..6615a3b88 --- /dev/null +++ b/swarm/storage/localstore/mode_get_test.go @@ -0,0 +1,237 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "bytes" + "testing" + "time" +) + +// TestModeGetRequest validates ModeGetRequest index values on the provided DB. +func TestModeGetRequest(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploadTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return uploadTimestamp + })() + + chunk := generateRandomChunk() + + err := db.NewPutter(ModePutUpload).Put(chunk) + if err != nil { + t.Fatal(err) + } + + requester := db.NewGetter(ModeGetRequest) + + // set update gc test hook to signal when + // update gc goroutine is done by sending to + // testHookUpdateGCChan channel, which is + // used to wait for garbage colletion index + // changes + testHookUpdateGCChan := make(chan struct{}) + defer setTestHookUpdateGC(func() { + testHookUpdateGCChan <- struct{}{} + })() + + t.Run("get unsynced", func(t *testing.T) { + got, err := requester.Get(chunk.Address()) + if err != nil { + t.Fatal(err) + } + // wait for update gc goroutine to be done + <-testHookUpdateGCChan + + if !bytes.Equal(got.Address(), chunk.Address()) { + t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address()) + } + + if !bytes.Equal(got.Data(), chunk.Data()) { + t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data()) + } + + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 0)) + + t.Run("gc size", newIndexGCSizeTest(db)) + }) + + // set chunk to synced state + err = db.NewSetter(ModeSetSync).Set(chunk.Address()) + if err != nil { + t.Fatal(err) + } + + t.Run("first get", func(t *testing.T) { + got, err := requester.Get(chunk.Address()) + if err != nil { + t.Fatal(err) + } + // wait for update gc goroutine to be done + <-testHookUpdateGCChan + + if !bytes.Equal(got.Address(), chunk.Address()) { + t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address()) + } + + if !bytes.Equal(got.Data(), chunk.Data()) { + t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data()) + } + + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, uploadTimestamp)) + + t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, uploadTimestamp)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) + + t.Run("gc size", newIndexGCSizeTest(db)) + }) + + t.Run("second get", func(t *testing.T) { + accessTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return accessTimestamp + })() + + got, err := requester.Get(chunk.Address()) + if err != nil { + t.Fatal(err) + } + // wait for update gc goroutine to be done + <-testHookUpdateGCChan + + if !bytes.Equal(got.Address(), chunk.Address()) { + t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address()) + } + + if !bytes.Equal(got.Data(), chunk.Data()) { + t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data()) + } + + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, accessTimestamp)) + + t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, accessTimestamp)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) + + t.Run("gc size", newIndexGCSizeTest(db)) + }) +} + +// TestModeGetSync validates ModeGetSync index values on the provided DB. +func TestModeGetSync(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploadTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return uploadTimestamp + })() + + chunk := generateRandomChunk() + + err := db.NewPutter(ModePutUpload).Put(chunk) + if err != nil { + t.Fatal(err) + } + + got, err := db.NewGetter(ModeGetSync).Get(chunk.Address()) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(got.Address(), chunk.Address()) { + t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address()) + } + + if !bytes.Equal(got.Data(), chunk.Data()) { + t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data()) + } + + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 0)) + + t.Run("gc size", newIndexGCSizeTest(db)) +} + +// setTestHookUpdateGC sets testHookUpdateGC and +// returns a function that will reset it to the +// value before the change. +func setTestHookUpdateGC(h func()) (reset func()) { + current := testHookUpdateGC + reset = func() { testHookUpdateGC = current } + testHookUpdateGC = h + return reset +} + +// TestSetTestHookUpdateGC tests if setTestHookUpdateGC changes +// testHookUpdateGC function correctly and if its reset function +// resets the original function. +func TestSetTestHookUpdateGC(t *testing.T) { + // Set the current function after the test finishes. + defer func(h func()) { testHookUpdateGC = h }(testHookUpdateGC) + + // expected value for the unchanged function + original := 1 + // expected value for the changed function + changed := 2 + + // this variable will be set with two different functions + var got int + + // define the original (unchanged) functions + testHookUpdateGC = func() { + got = original + } + + // set got variable + testHookUpdateGC() + + // test if got variable is set correctly + if got != original { + t.Errorf("got hook value %v, want %v", got, original) + } + + // set the new function + reset := setTestHookUpdateGC(func() { + got = changed + }) + + // set got variable + testHookUpdateGC() + + // test if got variable is set correctly to changed value + if got != changed { + t.Errorf("got hook value %v, want %v", got, changed) + } + + // set the function to the original one + reset() + + // set got variable + testHookUpdateGC() + + // test if got variable is set correctly to original value + if got != original { + t.Errorf("got hook value %v, want %v", got, original) + } +} diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go new file mode 100644 index 000000000..1a5a3d1b1 --- /dev/null +++ b/swarm/storage/localstore/mode_put.go @@ -0,0 +1,160 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" +) + +// ModePut enumerates different Putter modes. +type ModePut int + +// Putter modes. +const ( + // ModePutRequest: when a chunk is received as a result of retrieve request and delivery + ModePutRequest ModePut = iota + // ModePutSync: when a chunk is received via syncing + ModePutSync + // ModePutUpload: when a chunk is created by local upload + ModePutUpload +) + +// Putter provides Put method to store Chunks +// to database. +type Putter struct { + db *DB + mode ModePut +} + +// NewPutter returns a new Putter on database +// with a specific Mode. +func (db *DB) NewPutter(mode ModePut) *Putter { + return &Putter{ + mode: mode, + db: db, + } +} + +// Put stores the Chunk to database and depending +// on the Putter mode, it updates required indexes. +func (p *Putter) Put(ch storage.Chunk) (err error) { + return p.db.put(p.mode, chunkToItem(ch)) +} + +// put stores Item to database and updates other +// indexes. It acquires lockAddr to protect two calls +// of this function for the same address in parallel. +// Item fields Address and Data must not be +// with their nil values. +func (db *DB) put(mode ModePut, item shed.Item) (err error) { + // protect parallel updates + unlock, err := db.lockAddr(item.Address) + if err != nil { + return err + } + defer unlock() + + batch := new(leveldb.Batch) + + // variables that provide information for operations + // to be done after write batch function successfully executes + var gcSizeChange int64 // number to add or subtract from gcSize + var triggerPullFeed bool // signal pull feed subscriptions to iterate + var triggerPushFeed bool // signal push feed subscriptions to iterate + + switch mode { + case ModePutRequest: + // put to indexes: retrieve, gc; it does not enter the syncpool + + // check if the chunk already is in the database + // as gc index is updated + i, err := db.retrievalAccessIndex.Get(item) + switch err { + case nil: + item.AccessTimestamp = i.AccessTimestamp + case leveldb.ErrNotFound: + // no chunk accesses + default: + return err + } + i, err = db.retrievalDataIndex.Get(item) + switch err { + case nil: + item.StoreTimestamp = i.StoreTimestamp + case leveldb.ErrNotFound: + // no chunk accesses + default: + return err + } + if item.AccessTimestamp != 0 { + // delete current entry from the gc index + db.gcIndex.DeleteInBatch(batch, item) + gcSizeChange-- + } + if item.StoreTimestamp == 0 { + item.StoreTimestamp = now() + } + // update access timestamp + item.AccessTimestamp = now() + // update retrieve access index + db.retrievalAccessIndex.PutInBatch(batch, item) + // add new entry to gc index + db.gcIndex.PutInBatch(batch, item) + db.gcUncountedHashesIndex.PutInBatch(batch, item) + gcSizeChange++ + + db.retrievalDataIndex.PutInBatch(batch, item) + + case ModePutUpload: + // put to indexes: retrieve, push, pull + + item.StoreTimestamp = now() + db.retrievalDataIndex.PutInBatch(batch, item) + db.pullIndex.PutInBatch(batch, item) + triggerPullFeed = true + db.pushIndex.PutInBatch(batch, item) + triggerPushFeed = true + + case ModePutSync: + // put to indexes: retrieve, pull + + item.StoreTimestamp = now() + db.retrievalDataIndex.PutInBatch(batch, item) + db.pullIndex.PutInBatch(batch, item) + triggerPullFeed = true + + default: + return ErrInvalidMode + } + + err = db.shed.WriteBatch(batch) + if err != nil { + return err + } + if gcSizeChange != 0 { + db.incGCSize(gcSizeChange) + } + if triggerPullFeed { + db.triggerPullSubscriptions(db.po(item.Address)) + } + if triggerPushFeed { + db.triggerPushSubscriptions() + } + return nil +} diff --git a/swarm/storage/localstore/mode_put_test.go b/swarm/storage/localstore/mode_put_test.go new file mode 100644 index 000000000..ffe6a4cb4 --- /dev/null +++ b/swarm/storage/localstore/mode_put_test.go @@ -0,0 +1,300 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "bytes" + "fmt" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// TestModePutRequest validates ModePutRequest index values on the provided DB. +func TestModePutRequest(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + putter := db.NewPutter(ModePutRequest) + + chunk := generateRandomChunk() + + // keep the record when the chunk is stored + var storeTimestamp int64 + + t.Run("first put", func(t *testing.T) { + wantTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantTimestamp + })() + + storeTimestamp = wantTimestamp + + err := putter.Put(chunk) + if err != nil { + t.Fatal(err) + } + + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) + + t.Run("gc size", newIndexGCSizeTest(db)) + }) + + t.Run("second put", func(t *testing.T) { + wantTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantTimestamp + })() + + err := putter.Put(chunk) + if err != nil { + t.Fatal(err) + } + + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, storeTimestamp, wantTimestamp)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) + + t.Run("gc size", newIndexGCSizeTest(db)) + }) +} + +// TestModePutSync validates ModePutSync index values on the provided DB. +func TestModePutSync(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + wantTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantTimestamp + })() + + chunk := generateRandomChunk() + + err := db.NewPutter(ModePutSync).Put(chunk) + if err != nil { + t.Fatal(err) + } + + t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0)) + + t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil)) +} + +// TestModePutUpload validates ModePutUpload index values on the provided DB. +func TestModePutUpload(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + wantTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantTimestamp + })() + + chunk := generateRandomChunk() + + err := db.NewPutter(ModePutUpload).Put(chunk) + if err != nil { + t.Fatal(err) + } + + t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0)) + + t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil)) + + t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, nil)) +} + +// TestModePutUpload_parallel uploads chunks in parallel +// and validates if all chunks can be retrieved with correct data. +func TestModePutUpload_parallel(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + chunkCount := 1000 + workerCount := 100 + + chunkChan := make(chan storage.Chunk) + errChan := make(chan error) + doneChan := make(chan struct{}) + defer close(doneChan) + + // start uploader workers + for i := 0; i < workerCount; i++ { + go func(i int) { + uploader := db.NewPutter(ModePutUpload) + for { + select { + case chunk, ok := <-chunkChan: + if !ok { + return + } + err := uploader.Put(chunk) + select { + case errChan <- err: + case <-doneChan: + } + case <-doneChan: + return + } + } + }(i) + } + + chunks := make([]storage.Chunk, 0) + var chunksMu sync.Mutex + + // send chunks to workers + go func() { + for i := 0; i < chunkCount; i++ { + chunk := generateRandomChunk() + select { + case chunkChan <- chunk: + case <-doneChan: + return + } + chunksMu.Lock() + chunks = append(chunks, chunk) + chunksMu.Unlock() + } + + close(chunkChan) + }() + + // validate every error from workers + for i := 0; i < chunkCount; i++ { + err := <-errChan + if err != nil { + t.Fatal(err) + } + } + + // get every chunk and validate its data + getter := db.NewGetter(ModeGetRequest) + + chunksMu.Lock() + defer chunksMu.Unlock() + for _, chunk := range chunks { + got, err := getter.Get(chunk.Address()) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got.Data(), chunk.Data()) { + t.Fatalf("got chunk %s data %x, want %x", chunk.Address().Hex(), got.Data(), chunk.Data()) + } + } +} + +// BenchmarkPutUpload runs a series of benchmarks that upload +// a specific number of chunks in parallel. +// +// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014) +// +// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkPutUpload -v +// +// goos: darwin +// goarch: amd64 +// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore +// BenchmarkPutUpload/count_100_parallel_1-8 300 5107704 ns/op 2081461 B/op 2374 allocs/op +// BenchmarkPutUpload/count_100_parallel_2-8 300 5411742 ns/op 2081608 B/op 2364 allocs/op +// BenchmarkPutUpload/count_100_parallel_4-8 500 3704964 ns/op 2081696 B/op 2324 allocs/op +// BenchmarkPutUpload/count_100_parallel_8-8 500 2932663 ns/op 2082594 B/op 2295 allocs/op +// BenchmarkPutUpload/count_100_parallel_16-8 500 3117157 ns/op 2085438 B/op 2282 allocs/op +// BenchmarkPutUpload/count_100_parallel_32-8 500 3449122 ns/op 2089721 B/op 2286 allocs/op +// BenchmarkPutUpload/count_1000_parallel_1-8 20 79784470 ns/op 25211240 B/op 23225 allocs/op +// BenchmarkPutUpload/count_1000_parallel_2-8 20 75422164 ns/op 25210730 B/op 23187 allocs/op +// BenchmarkPutUpload/count_1000_parallel_4-8 20 70698378 ns/op 25206522 B/op 22692 allocs/op +// BenchmarkPutUpload/count_1000_parallel_8-8 20 71285528 ns/op 25213436 B/op 22345 allocs/op +// BenchmarkPutUpload/count_1000_parallel_16-8 20 71301826 ns/op 25205040 B/op 22090 allocs/op +// BenchmarkPutUpload/count_1000_parallel_32-8 30 57713506 ns/op 25219781 B/op 21848 allocs/op +// BenchmarkPutUpload/count_10000_parallel_1-8 2 656719345 ns/op 216792908 B/op 248940 allocs/op +// BenchmarkPutUpload/count_10000_parallel_2-8 2 646301962 ns/op 216730800 B/op 248270 allocs/op +// BenchmarkPutUpload/count_10000_parallel_4-8 2 532784228 ns/op 216667080 B/op 241910 allocs/op +// BenchmarkPutUpload/count_10000_parallel_8-8 3 494290188 ns/op 216297749 B/op 236247 allocs/op +// BenchmarkPutUpload/count_10000_parallel_16-8 3 483485315 ns/op 216060384 B/op 231090 allocs/op +// BenchmarkPutUpload/count_10000_parallel_32-8 3 434461294 ns/op 215371280 B/op 224800 allocs/op +// BenchmarkPutUpload/count_100000_parallel_1-8 1 22767894338 ns/op 2331372088 B/op 4049876 allocs/op +// BenchmarkPutUpload/count_100000_parallel_2-8 1 25347872677 ns/op 2344140160 B/op 4106763 allocs/op +// BenchmarkPutUpload/count_100000_parallel_4-8 1 23580460174 ns/op 2338582576 B/op 4027452 allocs/op +// BenchmarkPutUpload/count_100000_parallel_8-8 1 22197559193 ns/op 2321803496 B/op 3877553 allocs/op +// BenchmarkPutUpload/count_100000_parallel_16-8 1 22527046476 ns/op 2327854800 B/op 3885455 allocs/op +// BenchmarkPutUpload/count_100000_parallel_32-8 1 21332243613 ns/op 2299654568 B/op 3697181 allocs/op +// PASS +func BenchmarkPutUpload(b *testing.B) { + for _, count := range []int{ + 100, + 1000, + 10000, + 100000, + } { + for _, maxParallelUploads := range []int{ + 1, + 2, + 4, + 8, + 16, + 32, + } { + name := fmt.Sprintf("count %v parallel %v", count, maxParallelUploads) + b.Run(name, func(b *testing.B) { + for n := 0; n < b.N; n++ { + benchmarkPutUpload(b, nil, count, maxParallelUploads) + } + }) + } + } +} + +// benchmarkPutUpload runs a benchmark by uploading a specific number +// of chunks with specified max parallel uploads. +func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int) { + b.StopTimer() + db, cleanupFunc := newTestDB(b, o) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + chunks := make([]storage.Chunk, count) + for i := 0; i < count; i++ { + chunks[i] = generateFakeRandomChunk() + } + errs := make(chan error) + b.StartTimer() + + go func() { + sem := make(chan struct{}, maxParallelUploads) + for i := 0; i < count; i++ { + sem <- struct{}{} + + go func(i int) { + defer func() { <-sem }() + + errs <- uploader.Put(chunks[i]) + }(i) + } + }() + + for i := 0; i < count; i++ { + err := <-errs + if err != nil { + b.Fatal(err) + } + } +} diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go new file mode 100644 index 000000000..a522f4447 --- /dev/null +++ b/swarm/storage/localstore/mode_set.go @@ -0,0 +1,205 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" +) + +// ModeSet enumerates different Setter modes. +type ModeSet int + +// Setter modes. +const ( + // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery + ModeSetAccess ModeSet = iota + // ModeSetSync: when push sync receipt is received + ModeSetSync + // modeSetRemove: when GC-d + // unexported as no external packages should remove chunks from database + modeSetRemove +) + +// Setter sets the state of a particular +// Chunk in database by changing indexes. +type Setter struct { + db *DB + mode ModeSet +} + +// NewSetter returns a new Setter on database +// with a specific Mode. +func (db *DB) NewSetter(mode ModeSet) *Setter { + return &Setter{ + mode: mode, + db: db, + } +} + +// Set updates database indexes for a specific +// chunk represented by the address. +func (s *Setter) Set(addr storage.Address) (err error) { + return s.db.set(s.mode, addr) +} + +// set updates database indexes for a specific +// chunk represented by the address. +// It acquires lockAddr to protect two calls +// of this function for the same address in parallel. +func (db *DB) set(mode ModeSet, addr storage.Address) (err error) { + // protect parallel updates + unlock, err := db.lockAddr(addr) + if err != nil { + return err + } + defer unlock() + + batch := new(leveldb.Batch) + + // variables that provide information for operations + // to be done after write batch function successfully executes + var gcSizeChange int64 // number to add or subtract from gcSize + var triggerPullFeed bool // signal pull feed subscriptions to iterate + + item := addressToItem(addr) + + switch mode { + case ModeSetAccess: + // add to pull, insert to gc + + // need to get access timestamp here as it is not + // provided by the access function, and it is not + // a property of a chunk provided to Accessor.Put. + + i, err := db.retrievalDataIndex.Get(item) + switch err { + case nil: + item.StoreTimestamp = i.StoreTimestamp + case leveldb.ErrNotFound: + db.pushIndex.DeleteInBatch(batch, item) + item.StoreTimestamp = now() + default: + return err + } + + i, err = db.retrievalAccessIndex.Get(item) + switch err { + case nil: + item.AccessTimestamp = i.AccessTimestamp + db.gcIndex.DeleteInBatch(batch, item) + gcSizeChange-- + case leveldb.ErrNotFound: + // the chunk is not accessed before + default: + return err + } + item.AccessTimestamp = now() + db.retrievalAccessIndex.PutInBatch(batch, item) + db.pullIndex.PutInBatch(batch, item) + triggerPullFeed = true + db.gcIndex.PutInBatch(batch, item) + db.gcUncountedHashesIndex.PutInBatch(batch, item) + gcSizeChange++ + + case ModeSetSync: + // delete from push, insert to gc + + // need to get access timestamp here as it is not + // provided by the access function, and it is not + // a property of a chunk provided to Accessor.Put. + i, err := db.retrievalDataIndex.Get(item) + if err != nil { + if err == leveldb.ErrNotFound { + // chunk is not found, + // no need to update gc index + // just delete from the push index + // if it is there + db.pushIndex.DeleteInBatch(batch, item) + return nil + } + return err + } + item.StoreTimestamp = i.StoreTimestamp + + i, err = db.retrievalAccessIndex.Get(item) + switch err { + case nil: + item.AccessTimestamp = i.AccessTimestamp + db.gcIndex.DeleteInBatch(batch, item) + gcSizeChange-- + case leveldb.ErrNotFound: + // the chunk is not accessed before + default: + return err + } + item.AccessTimestamp = now() + db.retrievalAccessIndex.PutInBatch(batch, item) + db.pushIndex.DeleteInBatch(batch, item) + db.gcIndex.PutInBatch(batch, item) + db.gcUncountedHashesIndex.PutInBatch(batch, item) + gcSizeChange++ + + case modeSetRemove: + // delete from retrieve, pull, gc + + // need to get access timestamp here as it is not + // provided by the access function, and it is not + // a property of a chunk provided to Accessor.Put. + + i, err := db.retrievalAccessIndex.Get(item) + switch err { + case nil: + item.AccessTimestamp = i.AccessTimestamp + case leveldb.ErrNotFound: + default: + return err + } + i, err = db.retrievalDataIndex.Get(item) + if err != nil { + return err + } + item.StoreTimestamp = i.StoreTimestamp + + db.retrievalDataIndex.DeleteInBatch(batch, item) + db.retrievalAccessIndex.DeleteInBatch(batch, item) + db.pullIndex.DeleteInBatch(batch, item) + db.gcIndex.DeleteInBatch(batch, item) + db.gcUncountedHashesIndex.DeleteInBatch(batch, item) + // a check is needed for decrementing gcSize + // as delete is not reporting if the key/value pair + // is deleted or not + if _, err := db.gcIndex.Get(item); err == nil { + gcSizeChange = -1 + } + + default: + return ErrInvalidMode + } + + err = db.shed.WriteBatch(batch) + if err != nil { + return err + } + if gcSizeChange != 0 { + db.incGCSize(gcSizeChange) + } + if triggerPullFeed { + db.triggerPullSubscriptions(db.po(item.Address)) + } + return nil +} diff --git a/swarm/storage/localstore/mode_set_test.go b/swarm/storage/localstore/mode_set_test.go new file mode 100644 index 000000000..94cd0a3e2 --- /dev/null +++ b/swarm/storage/localstore/mode_set_test.go @@ -0,0 +1,128 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "testing" + "time" + + "github.com/syndtr/goleveldb/leveldb" +) + +// TestModeSetAccess validates ModeSetAccess index values on the provided DB. +func TestModeSetAccess(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + chunk := generateRandomChunk() + + wantTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantTimestamp + })() + + err := db.NewSetter(ModeSetAccess).Set(chunk.Address()) + if err != nil { + t.Fatal(err) + } + + t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil)) + + t.Run("pull index count", newItemsCountTest(db.pullIndex, 1)) + + t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) + + t.Run("gc size", newIndexGCSizeTest(db)) +} + +// TestModeSetSync validates ModeSetSync index values on the provided DB. +func TestModeSetSync(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + chunk := generateRandomChunk() + + wantTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantTimestamp + })() + + err := db.NewPutter(ModePutUpload).Put(chunk) + if err != nil { + t.Fatal(err) + } + + err = db.NewSetter(ModeSetSync).Set(chunk.Address()) + if err != nil { + t.Fatal(err) + } + + t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp)) + + t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, leveldb.ErrNotFound)) + + t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 1)) + + t.Run("gc size", newIndexGCSizeTest(db)) +} + +// TestModeSetRemove validates ModeSetRemove index values on the provided DB. +func TestModeSetRemove(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + chunk := generateRandomChunk() + + err := db.NewPutter(ModePutUpload).Put(chunk) + if err != nil { + t.Fatal(err) + } + + err = db.NewSetter(modeSetRemove).Set(chunk.Address()) + if err != nil { + t.Fatal(err) + } + + t.Run("retrieve indexes", func(t *testing.T) { + wantErr := leveldb.ErrNotFound + _, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address())) + if err != wantErr { + t.Errorf("got error %v, want %v", err, wantErr) + } + t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0)) + + // access index should not be set + _, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address())) + if err != wantErr { + t.Errorf("got error %v, want %v", err, wantErr) + } + t.Run("retrieve access index count", newItemsCountTest(db.retrievalAccessIndex, 0)) + }) + + t.Run("pull index", newPullIndexTest(db, chunk, 0, leveldb.ErrNotFound)) + + t.Run("pull index count", newItemsCountTest(db.pullIndex, 0)) + + t.Run("gc index count", newItemsCountTest(db.gcIndex, 0)) + + t.Run("gc size", newIndexGCSizeTest(db)) + +} diff --git a/swarm/storage/localstore/retrieval_index_test.go b/swarm/storage/localstore/retrieval_index_test.go new file mode 100644 index 000000000..9f5b452c5 --- /dev/null +++ b/swarm/storage/localstore/retrieval_index_test.go @@ -0,0 +1,150 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "strconv" + "testing" + + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// BenchmarkRetrievalIndexes uploads a number of chunks in order to measure +// total time of updating their retrieval indexes by setting them +// to synced state and requesting them. +// +// This benchmark takes significant amount of time. +// +// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014) show +// that two separated indexes perform better. +// +// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkRetrievalIndexes -v +// goos: darwin +// goarch: amd64 +// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore +// BenchmarkRetrievalIndexes/1000-8 20 75556686 ns/op 19033493 B/op 84500 allocs/op +// BenchmarkRetrievalIndexes/10000-8 1 1079084922 ns/op 382792064 B/op 1429644 allocs/op +// BenchmarkRetrievalIndexes/100000-8 1 16891305737 ns/op 2629165304 B/op 12465019 allocs/op +// PASS +func BenchmarkRetrievalIndexes(b *testing.B) { + for _, count := range []int{ + 1000, + 10000, + 100000, + } { + b.Run(strconv.Itoa(count)+"-split", func(b *testing.B) { + for n := 0; n < b.N; n++ { + benchmarkRetrievalIndexes(b, nil, count) + } + }) + } +} + +// benchmarkRetrievalIndexes is used in BenchmarkRetrievalIndexes +// to do benchmarks with a specific number of chunks and different +// database options. +func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) { + b.StopTimer() + db, cleanupFunc := newTestDB(b, o) + defer cleanupFunc() + uploader := db.NewPutter(ModePutUpload) + syncer := db.NewSetter(ModeSetSync) + requester := db.NewGetter(ModeGetRequest) + addrs := make([]storage.Address, count) + for i := 0; i < count; i++ { + chunk := generateFakeRandomChunk() + err := uploader.Put(chunk) + if err != nil { + b.Fatal(err) + } + addrs[i] = chunk.Address() + } + // set update gc test hook to signal when + // update gc goroutine is done by sending to + // testHookUpdateGCChan channel, which is + // used to wait for gc index updates to be + // included in the benchmark time + testHookUpdateGCChan := make(chan struct{}) + defer setTestHookUpdateGC(func() { + testHookUpdateGCChan <- struct{}{} + })() + b.StartTimer() + + for i := 0; i < count; i++ { + err := syncer.Set(addrs[i]) + if err != nil { + b.Fatal(err) + } + + _, err = requester.Get(addrs[i]) + if err != nil { + b.Fatal(err) + } + // wait for update gc goroutine to be done + <-testHookUpdateGCChan + } +} + +// BenchmarkUpload compares uploading speed for different +// retrieval indexes and various number of chunks. +// +// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014). +// +// go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkUpload -v +// goos: darwin +// goarch: amd64 +// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore +// BenchmarkUpload/1000-8 20 59437463 ns/op 25205193 B/op 23208 allocs/op +// BenchmarkUpload/10000-8 2 580646362 ns/op 216532932 B/op 248090 allocs/op +// BenchmarkUpload/100000-8 1 22373390892 ns/op 2323055312 B/op 3995903 allocs/op +// PASS +func BenchmarkUpload(b *testing.B) { + for _, count := range []int{ + 1000, + 10000, + 100000, + } { + b.Run(strconv.Itoa(count), func(b *testing.B) { + for n := 0; n < b.N; n++ { + benchmarkUpload(b, nil, count) + } + }) + } +} + +// benchmarkUpload is used in BenchmarkUpload +// to do benchmarks with a specific number of chunks and different +// database options. +func benchmarkUpload(b *testing.B, o *Options, count int) { + b.StopTimer() + db, cleanupFunc := newTestDB(b, o) + defer cleanupFunc() + uploader := db.NewPutter(ModePutUpload) + chunks := make([]storage.Chunk, count) + for i := 0; i < count; i++ { + chunk := generateFakeRandomChunk() + chunks[i] = chunk + } + b.StartTimer() + + for i := 0; i < count; i++ { + err := uploader.Put(chunks[i]) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go new file mode 100644 index 000000000..a18f0915d --- /dev/null +++ b/swarm/storage/localstore/subscription_pull.go @@ -0,0 +1,193 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "bytes" + "context" + "errors" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index. +// Pull syncing index can be only subscribed to a particular proximity order bin. If since +// is not nil, the iteration will start from the first item stored after that timestamp. If until is not nil, +// only chunks stored up to this timestamp will be send to the channel, and the returned channel will be +// closed. The since-until interval is open on the left and closed on the right (since,until]. Returned stop +// function will terminate current and further iterations without errors, and also close the returned channel. +// Make sure that you check the second returned parameter from the channel to stop iteration when its value +// is false. +func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkDescriptor) (c <-chan ChunkDescriptor, stop func()) { + chunkDescriptors := make(chan ChunkDescriptor) + trigger := make(chan struct{}, 1) + + db.pullTriggersMu.Lock() + if _, ok := db.pullTriggers[bin]; !ok { + db.pullTriggers[bin] = make([]chan struct{}, 0) + } + db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger) + db.pullTriggersMu.Unlock() + + // send signal for the initial iteration + trigger <- struct{}{} + + stopChan := make(chan struct{}) + var stopChanOnce sync.Once + + // used to provide information from the iterator to + // stop subscription when until chunk descriptor is reached + var errStopSubscription = errors.New("stop subscription") + + go func() { + // close the returned ChunkDescriptor channel at the end to + // signal that the subscription is done + defer close(chunkDescriptors) + // sinceItem is the Item from which the next iteration + // should start. The first iteration starts from the first Item. + var sinceItem *shed.Item + if since != nil { + sinceItem = &shed.Item{ + Address: since.Address, + StoreTimestamp: since.StoreTimestamp, + } + } + for { + select { + case <-trigger: + // iterate until: + // - last index Item is reached + // - subscription stop is called + // - context is done + err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) { + select { + case chunkDescriptors <- ChunkDescriptor{ + Address: item.Address, + StoreTimestamp: item.StoreTimestamp, + }: + // until chunk descriptor is sent + // break the iteration + if until != nil && + (item.StoreTimestamp >= until.StoreTimestamp || + bytes.Equal(item.Address, until.Address)) { + return true, errStopSubscription + } + // set next iteration start item + // when its chunk is successfully sent to channel + sinceItem = &item + return false, nil + case <-stopChan: + // gracefully stop the iteration + // on stop + return true, nil + case <-db.close: + // gracefully stop the iteration + // on database close + return true, nil + case <-ctx.Done(): + return true, ctx.Err() + } + }, &shed.IterateOptions{ + StartFrom: sinceItem, + // sinceItem was sent as the last Address in the previous + // iterator call, skip it in this one + SkipStartFromItem: true, + Prefix: []byte{bin}, + }) + if err != nil { + if err == errStopSubscription { + // stop subscription without any errors + // if until is reached + return + } + log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err) + return + } + case <-stopChan: + // terminate the subscription + // on stop + return + case <-db.close: + // terminate the subscription + // on database close + return + case <-ctx.Done(): + err := ctx.Err() + if err != nil { + log.Error("localstore pull subscription", "bin", bin, "since", since, "until", until, "err", err) + } + return + } + } + }() + + stop = func() { + stopChanOnce.Do(func() { + close(stopChan) + }) + + db.pullTriggersMu.Lock() + defer db.pullTriggersMu.Unlock() + + for i, t := range db.pullTriggers[bin] { + if t == trigger { + db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...) + break + } + } + } + + return chunkDescriptors, stop +} + +// ChunkDescriptor holds information required for Pull syncing. This struct +// is provided by subscribing to pull index. +type ChunkDescriptor struct { + Address storage.Address + StoreTimestamp int64 +} + +func (c *ChunkDescriptor) String() string { + if c == nil { + return "none" + } + return fmt.Sprintf("%s stored at %v", c.Address.Hex(), c.StoreTimestamp) +} + +// triggerPullSubscriptions is used internally for starting iterations +// on Pull subscriptions for a particular bin. When new item with address +// that is in particular bin for DB's baseKey is added to pull index +// this function should be called. +func (db *DB) triggerPullSubscriptions(bin uint8) { + db.pullTriggersMu.RLock() + triggers, ok := db.pullTriggers[bin] + db.pullTriggersMu.RUnlock() + if !ok { + return + } + + for _, t := range triggers { + select { + case t <- struct{}{}: + default: + } + } +} diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go new file mode 100644 index 000000000..5c99e0dec --- /dev/null +++ b/swarm/storage/localstore/subscription_pull_test.go @@ -0,0 +1,478 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "bytes" + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// TestDB_SubscribePull uploads some chunks before and after +// pull syncing subscription is created and validates if +// all addresses are received in the right order +// for expected proximity order bins. +func TestDB_SubscribePull(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + addrs := make(map[uint8][]storage.Address) + var addrsMu sync.Mutex + var wantedChunksCount int + + // prepopulate database with some chunks + // before the subscription + uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10) + + // set a timeout on subscription + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // collect all errors from validating addresses, even nil ones + // to validate the number of addresses received by the subscription + errChan := make(chan error) + + for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + ch, stop := db.SubscribePull(ctx, bin, nil, nil) + defer stop() + + // receive and validate addresses from the subscription + go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + } + + // upload some chunks just after subscribe + uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5) + + time.Sleep(200 * time.Millisecond) + + // upload some chunks after some short time + // to ensure that subscription will include them + // in a dynamic environment + uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3) + + checkErrChan(ctx, t, errChan, wantedChunksCount) +} + +// TestDB_SubscribePull_multiple uploads chunks before and after +// multiple pull syncing subscriptions are created and +// validates if all addresses are received in the right order +// for expected proximity order bins. +func TestDB_SubscribePull_multiple(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + addrs := make(map[uint8][]storage.Address) + var addrsMu sync.Mutex + var wantedChunksCount int + + // prepopulate database with some chunks + // before the subscription + uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 10) + + // set a timeout on subscription + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // collect all errors from validating addresses, even nil ones + // to validate the number of addresses received by the subscription + errChan := make(chan error) + + subsCount := 10 + + // start a number of subscriptions + // that all of them will write every address error to errChan + for j := 0; j < subsCount; j++ { + for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + ch, stop := db.SubscribePull(ctx, bin, nil, nil) + defer stop() + + // receive and validate addresses from the subscription + go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + } + } + + // upload some chunks just after subscribe + uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 5) + + time.Sleep(200 * time.Millisecond) + + // upload some chunks after some short time + // to ensure that subscription will include them + // in a dynamic environment + uploadRandomChunksBin(t, db, uploader, addrs, &addrsMu, &wantedChunksCount, 3) + + checkErrChan(ctx, t, errChan, wantedChunksCount*subsCount) +} + +// TestDB_SubscribePull_since uploads chunks before and after +// pull syncing subscriptions are created with a since argument +// and validates if all expected addresses are received in the +// right order for expected proximity order bins. +func TestDB_SubscribePull_since(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + addrs := make(map[uint8][]storage.Address) + var addrsMu sync.Mutex + var wantedChunksCount int + + lastTimestamp := time.Now().UTC().UnixNano() + var lastTimestampMu sync.RWMutex + defer setNow(func() (t int64) { + lastTimestampMu.Lock() + defer lastTimestampMu.Unlock() + lastTimestamp++ + return lastTimestamp + })() + + uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) { + last = make(map[uint8]ChunkDescriptor) + for i := 0; i < count; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + bin := db.po(chunk.Address()) + + addrsMu.Lock() + if _, ok := addrs[bin]; !ok { + addrs[bin] = make([]storage.Address, 0) + } + if wanted { + addrs[bin] = append(addrs[bin], chunk.Address()) + wantedChunksCount++ + } + addrsMu.Unlock() + + lastTimestampMu.RLock() + storeTimestamp := lastTimestamp + lastTimestampMu.RUnlock() + + last[bin] = ChunkDescriptor{ + Address: chunk.Address(), + StoreTimestamp: storeTimestamp, + } + } + return last + } + + // prepopulate database with some chunks + // before the subscription + last := uploadRandomChunks(30, false) + + uploadRandomChunks(25, true) + + // set a timeout on subscription + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // collect all errors from validating addresses, even nil ones + // to validate the number of addresses received by the subscription + errChan := make(chan error) + + for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + var since *ChunkDescriptor + if c, ok := last[bin]; ok { + since = &c + } + ch, stop := db.SubscribePull(ctx, bin, since, nil) + defer stop() + + // receive and validate addresses from the subscription + go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + + } + + // upload some chunks just after subscribe + uploadRandomChunks(15, true) + + checkErrChan(ctx, t, errChan, wantedChunksCount) +} + +// TestDB_SubscribePull_until uploads chunks before and after +// pull syncing subscriptions are created with an until argument +// and validates if all expected addresses are received in the +// right order for expected proximity order bins. +func TestDB_SubscribePull_until(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + addrs := make(map[uint8][]storage.Address) + var addrsMu sync.Mutex + var wantedChunksCount int + + lastTimestamp := time.Now().UTC().UnixNano() + var lastTimestampMu sync.RWMutex + defer setNow(func() (t int64) { + lastTimestampMu.Lock() + defer lastTimestampMu.Unlock() + lastTimestamp++ + return lastTimestamp + })() + + uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) { + last = make(map[uint8]ChunkDescriptor) + for i := 0; i < count; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + bin := db.po(chunk.Address()) + + addrsMu.Lock() + if _, ok := addrs[bin]; !ok { + addrs[bin] = make([]storage.Address, 0) + } + if wanted { + addrs[bin] = append(addrs[bin], chunk.Address()) + wantedChunksCount++ + } + addrsMu.Unlock() + + lastTimestampMu.RLock() + storeTimestamp := lastTimestamp + lastTimestampMu.RUnlock() + + last[bin] = ChunkDescriptor{ + Address: chunk.Address(), + StoreTimestamp: storeTimestamp, + } + } + return last + } + + // prepopulate database with some chunks + // before the subscription + last := uploadRandomChunks(30, true) + + uploadRandomChunks(25, false) + + // set a timeout on subscription + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // collect all errors from validating addresses, even nil ones + // to validate the number of addresses received by the subscription + errChan := make(chan error) + + for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + until, ok := last[bin] + if !ok { + continue + } + ch, stop := db.SubscribePull(ctx, bin, nil, &until) + defer stop() + + // receive and validate addresses from the subscription + go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + } + + // upload some chunks just after subscribe + uploadRandomChunks(15, false) + + checkErrChan(ctx, t, errChan, wantedChunksCount) +} + +// TestDB_SubscribePull_sinceAndUntil uploads chunks before and +// after pull syncing subscriptions are created with since +// and until arguments, and validates if all expected addresses +// are received in the right order for expected proximity order bins. +func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + addrs := make(map[uint8][]storage.Address) + var addrsMu sync.Mutex + var wantedChunksCount int + + lastTimestamp := time.Now().UTC().UnixNano() + var lastTimestampMu sync.RWMutex + defer setNow(func() (t int64) { + lastTimestampMu.Lock() + defer lastTimestampMu.Unlock() + lastTimestamp++ + return lastTimestamp + })() + + uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) { + last = make(map[uint8]ChunkDescriptor) + for i := 0; i < count; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + bin := db.po(chunk.Address()) + + addrsMu.Lock() + if _, ok := addrs[bin]; !ok { + addrs[bin] = make([]storage.Address, 0) + } + if wanted { + addrs[bin] = append(addrs[bin], chunk.Address()) + wantedChunksCount++ + } + addrsMu.Unlock() + + lastTimestampMu.RLock() + storeTimestamp := lastTimestamp + lastTimestampMu.RUnlock() + + last[bin] = ChunkDescriptor{ + Address: chunk.Address(), + StoreTimestamp: storeTimestamp, + } + } + return last + } + + // all chunks from upload1 are not expected + // as upload1 chunk is used as since for subscriptions + upload1 := uploadRandomChunks(100, false) + + // all chunks from upload2 are expected + // as upload2 chunk is used as until for subscriptions + upload2 := uploadRandomChunks(100, true) + + // upload some chunks before subscribe but after + // wanted chunks + uploadRandomChunks(8, false) + + // set a timeout on subscription + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // collect all errors from validating addresses, even nil ones + // to validate the number of addresses received by the subscription + errChan := make(chan error) + + for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + var since *ChunkDescriptor + if c, ok := upload1[bin]; ok { + since = &c + } + until, ok := upload2[bin] + if !ok { + // no chunks un this bin uploaded in the upload2 + // skip this bin from testing + continue + } + ch, stop := db.SubscribePull(ctx, bin, since, &until) + defer stop() + + // receive and validate addresses from the subscription + go readPullSubscriptionBin(ctx, bin, ch, addrs, &addrsMu, errChan) + } + + // upload some chunks just after subscribe + uploadRandomChunks(15, false) + + checkErrChan(ctx, t, errChan, wantedChunksCount) +} + +// uploadRandomChunksBin uploads random chunks to database and adds them to +// the map of addresses ber bin. +func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uint8][]storage.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) { + for i := 0; i < count; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + addrsMu.Lock() + bin := db.po(chunk.Address()) + if _, ok := addrs[bin]; !ok { + addrs[bin] = make([]storage.Address, 0) + } + addrs[bin] = append(addrs[bin], chunk.Address()) + addrsMu.Unlock() + + *wantedChunksCount++ + } +} + +// readPullSubscriptionBin is a helper function that reads all ChunkDescriptors from a channel and +// sends error to errChan, even if it is nil, to count the number of ChunkDescriptors +// returned by the channel. +func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDescriptor, addrs map[uint8][]storage.Address, addrsMu *sync.Mutex, errChan chan error) { + var i int // address index + for { + select { + case got, ok := <-ch: + if !ok { + return + } + addrsMu.Lock() + if i+1 > len(addrs[bin]) { + errChan <- fmt.Errorf("got more chunk addresses %v, then expected %v, for bin %v", i+1, len(addrs[bin]), bin) + } + want := addrs[bin][i] + addrsMu.Unlock() + var err error + if !bytes.Equal(got.Address, want) { + err = fmt.Errorf("got chunk address %v in bin %v %s, want %s", i, bin, got.Address.Hex(), want) + } + i++ + // send one and only one error per received address + errChan <- err + case <-ctx.Done(): + return + } + } +} + +// checkErrChan expects the number of wantedChunksCount errors from errChan +// and calls t.Error for the ones that are not nil. +func checkErrChan(ctx context.Context, t *testing.T, errChan chan error, wantedChunksCount int) { + t.Helper() + + for i := 0; i < wantedChunksCount; i++ { + select { + case err := <-errChan: + if err != nil { + t.Error(err) + } + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + } +} diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go new file mode 100644 index 000000000..b13f29399 --- /dev/null +++ b/swarm/storage/localstore/subscription_push.go @@ -0,0 +1,145 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "context" + "sync" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index. +// Returned stop function will terminate current and further iterations, and also it will close +// the returned channel without any errors. Make sure that you check the second returned parameter +// from the channel to stop iteration when its value is false. +func (db *DB) SubscribePush(ctx context.Context) (c <-chan storage.Chunk, stop func()) { + chunks := make(chan storage.Chunk) + trigger := make(chan struct{}, 1) + + db.pushTriggersMu.Lock() + db.pushTriggers = append(db.pushTriggers, trigger) + db.pushTriggersMu.Unlock() + + // send signal for the initial iteration + trigger <- struct{}{} + + stopChan := make(chan struct{}) + var stopChanOnce sync.Once + + go func() { + // close the returned chunkInfo channel at the end to + // signal that the subscription is done + defer close(chunks) + // sinceItem is the Item from which the next iteration + // should start. The first iteration starts from the first Item. + var sinceItem *shed.Item + for { + select { + case <-trigger: + // iterate until: + // - last index Item is reached + // - subscription stop is called + // - context is done + err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) { + // get chunk data + dataItem, err := db.retrievalDataIndex.Get(item) + if err != nil { + return true, err + } + + select { + case chunks <- storage.NewChunk(dataItem.Address, dataItem.Data): + // set next iteration start item + // when its chunk is successfully sent to channel + sinceItem = &item + return false, nil + case <-stopChan: + // gracefully stop the iteration + // on stop + return true, nil + case <-db.close: + // gracefully stop the iteration + // on database close + return true, nil + case <-ctx.Done(): + return true, ctx.Err() + } + }, &shed.IterateOptions{ + StartFrom: sinceItem, + // sinceItem was sent as the last Address in the previous + // iterator call, skip it in this one + SkipStartFromItem: true, + }) + if err != nil { + log.Error("localstore push subscription iteration", "err", err) + return + } + case <-stopChan: + // terminate the subscription + // on stop + return + case <-db.close: + // terminate the subscription + // on database close + return + case <-ctx.Done(): + err := ctx.Err() + if err != nil { + log.Error("localstore push subscription", "err", err) + } + return + } + } + }() + + stop = func() { + stopChanOnce.Do(func() { + close(stopChan) + }) + + db.pushTriggersMu.Lock() + defer db.pushTriggersMu.Unlock() + + for i, t := range db.pushTriggers { + if t == trigger { + db.pushTriggers = append(db.pushTriggers[:i], db.pushTriggers[i+1:]...) + break + } + } + } + + return chunks, stop +} + +// triggerPushSubscriptions is used internally for starting iterations +// on Push subscriptions. Whenever new item is added to the push index, +// this function should be called. +func (db *DB) triggerPushSubscriptions() { + db.pushTriggersMu.RLock() + triggers := db.pushTriggers + db.pushTriggersMu.RUnlock() + + for _, t := range triggers { + select { + case t <- struct{}{}: + default: + } + } +} diff --git a/swarm/storage/localstore/subscription_push_test.go b/swarm/storage/localstore/subscription_push_test.go new file mode 100644 index 000000000..73e7c25f7 --- /dev/null +++ b/swarm/storage/localstore/subscription_push_test.go @@ -0,0 +1,200 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package localstore + +import ( + "bytes" + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// TestDB_SubscribePush uploads some chunks before and after +// push syncing subscription is created and validates if +// all addresses are received in the right order. +func TestDB_SubscribePush(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + chunks := make([]storage.Chunk, 0) + var chunksMu sync.Mutex + + uploadRandomChunks := func(count int) { + for i := 0; i < count; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + chunksMu.Lock() + chunks = append(chunks, chunk) + chunksMu.Unlock() + } + } + + // prepopulate database with some chunks + // before the subscription + uploadRandomChunks(10) + + // set a timeout on subscription + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // collect all errors from validating addresses, even nil ones + // to validate the number of addresses received by the subscription + errChan := make(chan error) + + ch, stop := db.SubscribePush(ctx) + defer stop() + + // receive and validate addresses from the subscription + go func() { + var i int // address index + for { + select { + case got, ok := <-ch: + if !ok { + return + } + chunksMu.Lock() + want := chunks[i] + chunksMu.Unlock() + var err error + if !bytes.Equal(got.Data(), want.Data()) { + err = fmt.Errorf("got chunk %v data %x, want %x", i, got.Data(), want.Data()) + } + if !bytes.Equal(got.Address(), want.Address()) { + err = fmt.Errorf("got chunk %v address %s, want %s", i, got.Address().Hex(), want.Address().Hex()) + } + i++ + // send one and only one error per received address + errChan <- err + case <-ctx.Done(): + return + } + } + }() + + // upload some chunks just after subscribe + uploadRandomChunks(5) + + time.Sleep(200 * time.Millisecond) + + // upload some chunks after some short time + // to ensure that subscription will include them + // in a dynamic environment + uploadRandomChunks(3) + + checkErrChan(ctx, t, errChan, len(chunks)) +} + +// TestDB_SubscribePush_multiple uploads chunks before and after +// multiple push syncing subscriptions are created and +// validates if all addresses are received in the right order. +func TestDB_SubscribePush_multiple(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + uploader := db.NewPutter(ModePutUpload) + + addrs := make([]storage.Address, 0) + var addrsMu sync.Mutex + + uploadRandomChunks := func(count int) { + for i := 0; i < count; i++ { + chunk := generateRandomChunk() + + err := uploader.Put(chunk) + if err != nil { + t.Fatal(err) + } + + addrsMu.Lock() + addrs = append(addrs, chunk.Address()) + addrsMu.Unlock() + } + } + + // prepopulate database with some chunks + // before the subscription + uploadRandomChunks(10) + + // set a timeout on subscription + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // collect all errors from validating addresses, even nil ones + // to validate the number of addresses received by the subscription + errChan := make(chan error) + + subsCount := 10 + + // start a number of subscriptions + // that all of them will write every addresses error to errChan + for j := 0; j < subsCount; j++ { + ch, stop := db.SubscribePush(ctx) + defer stop() + + // receive and validate addresses from the subscription + go func(j int) { + var i int // address index + for { + select { + case got, ok := <-ch: + if !ok { + return + } + addrsMu.Lock() + want := addrs[i] + addrsMu.Unlock() + var err error + if !bytes.Equal(got.Address(), want) { + err = fmt.Errorf("got chunk %v address on subscription %v %s, want %s", i, j, got, want) + } + i++ + // send one and only one error per received address + errChan <- err + case <-ctx.Done(): + return + } + } + }(j) + } + + // upload some chunks just after subscribe + uploadRandomChunks(5) + + time.Sleep(200 * time.Millisecond) + + // upload some chunks after some short time + // to ensure that subscription will include them + // in a dynamic environment + uploadRandomChunks(3) + + // number of addresses received by all subscriptions + wantedChunksCount := len(addrs) * subsCount + + checkErrChan(ctx, t, errChan, wantedChunksCount) +}