diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index 1449efccd..7540af8ce 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -1,5 +1,109 @@ package chunk -const ( - DefaultSize = 4096 +import ( + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/common" ) + +const ( + DefaultSize = 4096 + MaxPO = 16 + AddressLength = 32 +) + +var ( + ErrChunkNotFound = errors.New("chunk not found") + ErrChunkInvalid = errors.New("invalid chunk") +) + +type Chunk interface { + Address() Address + Data() []byte +} + +type chunk struct { + addr Address + sdata []byte +} + +func NewChunk(addr Address, data []byte) *chunk { + return &chunk{ + addr: addr, + sdata: data, + } +} + +func (c *chunk) Address() Address { + return c.addr +} + +func (c *chunk) Data() []byte { + return c.sdata +} + +func (self *chunk) String() string { + return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata)) +} + +type Address []byte + +var ZeroAddr = Address(common.Hash{}.Bytes()) + +func (a Address) Hex() string { + return fmt.Sprintf("%064x", []byte(a[:])) +} + +func (a Address) Log() string { + if len(a[:]) < 8 { + return fmt.Sprintf("%x", []byte(a[:])) + } + return fmt.Sprintf("%016x", []byte(a[:8])) +} + +func (a Address) String() string { + return fmt.Sprintf("%064x", []byte(a)) +} + +func (a Address) MarshalJSON() (out []byte, err error) { + return []byte(`"` + a.String() + `"`), nil +} + +func (a *Address) UnmarshalJSON(value []byte) error { + s := string(value) + *a = make([]byte, 32) + h := common.Hex2Bytes(s[1 : len(s)-1]) + copy(*a, h) + return nil +} + +// Proximity returns the proximity order of the MSB distance between x and y +// +// The distance metric MSB(x, y) of two equal length byte sequences x an y is the +// value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed. +// the binary cast is big endian: most significant bit first (=MSB). +// +// Proximity(x, y) is a discrete logarithmic scaling of the MSB distance. +// It is defined as the reverse rank of the integer part of the base 2 +// logarithm of the distance. +// It is calculated by counting the number of common leading zeros in the (MSB) +// binary representation of the x^y. +// +// (0 farthest, 255 closest, 256 self) +func Proximity(one, other []byte) (ret int) { + b := (MaxPO-1)/8 + 1 + if b > len(one) { + b = len(one) + } + m := 8 + for i := 0; i < b; i++ { + oxo := one[i] ^ other[i] + for j := 0; j < m; j++ { + if (oxo>>uint8(7-j))&0x01 != 0 { + return i*8 + j + } + } + } + return MaxPO +} diff --git a/swarm/storage/types_test.go b/swarm/chunk/proximity_test.go similarity index 99% rename from swarm/storage/types_test.go rename to swarm/chunk/proximity_test.go index 32907bbf4..5632114b1 100644 --- a/swarm/storage/types_test.go +++ b/swarm/chunk/proximity_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package storage +package chunk import ( "strconv" diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 0fa5026dc..5b36b477e 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -25,7 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/metrics" - ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/spancontext" opentracing "github.com/opentracing/opentracing-go" @@ -127,7 +127,7 @@ type TreeChunker struct { func TreeJoin(ctx context.Context, addr Address, getter Getter, depth int) *LazyChunkReader { jp := &JoinerParams{ ChunkerParams: ChunkerParams{ - chunkSize: ch.DefaultSize, + chunkSize: chunk.DefaultSize, hashSize: int64(len(addr)), }, addr: addr, @@ -147,7 +147,7 @@ func TreeSplit(ctx context.Context, data io.Reader, size int64, putter Putter) ( tsp := &TreeSplitterParams{ SplitterParams: SplitterParams{ ChunkerParams: ChunkerParams{ - chunkSize: ch.DefaultSize, + chunkSize: chunk.DefaultSize, hashSize: putter.RefSize(), }, reader: data, diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index e74d0f4b8..c4d187b62 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -29,7 +29,7 @@ import ( "time" "github.com/ethereum/go-ethereum/log" - ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/mattn/go-colorable" ) @@ -94,7 +94,7 @@ func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() for i := int64(0); i < int64(n); i++ { - chunk := f(ch.DefaultSize) + chunk := f(chunk.DefaultSize) go func() { select { case errc <- store.Put(ctx, chunk): diff --git a/swarm/storage/error.go b/swarm/storage/error.go index a9d0616fa..1e412e55c 100644 --- a/swarm/storage/error.go +++ b/swarm/storage/error.go @@ -16,9 +16,7 @@ package storage -import ( - "errors" -) +import "github.com/ethereum/go-ethereum/swarm/chunk" const ( ErrInit = iota @@ -31,7 +29,8 @@ const ( ErrNotSynced ) +// Errors are the same as the ones in chunk package for backward compatibility. var ( - ErrChunkNotFound = errors.New("chunk not found") - ErrChunkInvalid = errors.New("invalid chunk") + ErrChunkNotFound = chunk.ErrChunkNotFound + ErrChunkInvalid = chunk.ErrChunkNotFound ) diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go index 23b52ee0d..345ce7430 100644 --- a/swarm/storage/hasherstore.go +++ b/swarm/storage/hasherstore.go @@ -21,7 +21,7 @@ import ( "fmt" "sync/atomic" - ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage/encryption" "golang.org/x/crypto/sha3" ) @@ -156,7 +156,7 @@ func (h *hasherStore) createHash(chunkData ChunkData) Address { return hasher.Sum(nil) } -func (h *hasherStore) createChunk(chunkData ChunkData) *chunk { +func (h *hasherStore) createChunk(chunkData ChunkData) Chunk { hash := h.createHash(chunkData) chunk := NewChunk(hash, chunkData) return chunk @@ -189,9 +189,9 @@ func (h *hasherStore) decryptChunkData(chunkData ChunkData, encryptionKey encryp // removing extra bytes which were just added for padding length := ChunkData(decryptedSpan).Size() - for length > ch.DefaultSize { - length = length + (ch.DefaultSize - 1) - length = length / ch.DefaultSize + for length > chunk.DefaultSize { + length = length + (chunk.DefaultSize - 1) + length = length / chunk.DefaultSize length *= uint64(h.refSize) } @@ -232,14 +232,14 @@ func (h *hasherStore) decrypt(chunkData ChunkData, key encryption.Key) ([]byte, } func (h *hasherStore) newSpanEncryption(key encryption.Key) encryption.Encryption { - return encryption.New(key, 0, uint32(ch.DefaultSize/h.refSize), sha3.NewLegacyKeccak256) + return encryption.New(key, 0, uint32(chunk.DefaultSize/h.refSize), sha3.NewLegacyKeccak256) } func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryption { - return encryption.New(key, int(ch.DefaultSize), 0, sha3.NewLegacyKeccak256) + return encryption.New(key, int(chunk.DefaultSize), 0, sha3.NewLegacyKeccak256) } -func (h *hasherStore) storeChunk(ctx context.Context, chunk *chunk) { +func (h *hasherStore) storeChunk(ctx context.Context, chunk Chunk) { atomic.AddUint64(&h.nrChunks, 1) go func() { select { diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 1d5357713..766a9e031 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -312,7 +312,7 @@ func decodeIndex(data []byte, index *dpaDBIndex) error { return dec.Decode(index) } -func decodeData(addr Address, data []byte) (*chunk, error) { +func decodeData(addr Address, data []byte) (Chunk, error) { return NewChunk(addr, data[32:]), nil } @@ -502,7 +502,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) { } // Cleanup iterates over the database and deletes chunks if they pass the `f` condition -func (s *LDBStore) Cleanup(f func(*chunk) bool) { +func (s *LDBStore) Cleanup(f func(Chunk) bool) { var errorsFound, removed, total int it := s.db.NewIterator() @@ -551,12 +551,14 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) { continue } - cs := int64(binary.LittleEndian.Uint64(c.sdata[:8])) - log.Trace("chunk", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) + sdata := c.Data() + + cs := int64(binary.LittleEndian.Uint64(sdata[:8])) + log.Trace("chunk", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(sdata), "size", cs) // if chunk is to be removed if f(c) { - log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) + log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(sdata), "size", cs) s.deleteNow(&index, getIndexKey(key[1:]), po) removed++ errorsFound++ @@ -980,7 +982,7 @@ func (s *LDBStore) Has(_ context.Context, addr Address) bool { } // TODO: To conform with other private methods of this object indices should not be updated -func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { +func (s *LDBStore) get(addr Address) (chunk Chunk, err error) { if s.closed { return nil, ErrDBClosed } diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index aa65183e3..d17bd7d0e 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -28,7 +28,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" - ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" ldberrors "github.com/syndtr/goleveldb/leveldb/errors" @@ -103,7 +103,7 @@ func TestMarkAccessed(t *testing.T) { t.Fatalf("init dbStore failed: %v", err) } - h := GenerateRandomChunk(ch.DefaultSize) + h := GenerateRandomChunk(chunk.DefaultSize) db.Put(context.Background(), h) @@ -201,7 +201,7 @@ func testIterator(t *testing.T, mock bool) { t.Fatalf("init dbStore failed: %v", err) } - chunks := GenerateRandomChunks(ch.DefaultSize, chunkcount) + chunks := GenerateRandomChunks(chunk.DefaultSize, chunkcount) for i = 0; i < len(chunks); i++ { chunkkeys[i] = chunks[i].Address() @@ -468,7 +468,7 @@ func testLDBStoreRemoveThenCollectGarbage(t *testing.T) { // put capacity count number of chunks chunks := make([]Chunk, n) for i := 0; i < n; i++ { - c := GenerateRandomChunk(ch.DefaultSize) + c := GenerateRandomChunk(chunk.DefaultSize) chunks[i] = c log.Trace("generate random chunk", "idx", i, "chunk", c) } diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index eefb7565a..a8f6f037f 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -241,7 +241,7 @@ func (ls *LocalStore) Migrate() error { func (ls *LocalStore) migrateFromNoneToPurity() { // delete chunks that are not valid, i.e. chunks that do not pass // any of the ls.Validators - ls.DbStore.Cleanup(func(c *chunk) bool { + ls.DbStore.Cleanup(func(c Chunk) bool { return !ls.isValid(c) }) } diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go index 60309d7fa..3964c16d5 100644 --- a/swarm/storage/localstore/gc_test.go +++ b/swarm/storage/localstore/gc_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/chunk" ) // TestDB_collectGarbageWorker tests garbage collection runs @@ -64,11 +64,11 @@ func testDB_collectGarbageWorker(t *testing.T) { uploader := db.NewPutter(ModePutUpload) syncer := db.NewSetter(ModeSetSync) - addrs := make([]storage.Address, 0) + addrs := make([]chunk.Address, 0) // upload random chunks for i := 0; i < chunkCount; i++ { - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := uploader.Put(chunk) if err != nil { @@ -106,8 +106,8 @@ func testDB_collectGarbageWorker(t *testing.T) { // the first synced chunk should be removed t.Run("get the first synced chunk", func(t *testing.T) { _, err := db.NewGetter(ModeGetRequest).Get(addrs[0]) - if err != storage.ErrChunkNotFound { - t.Errorf("got error %v, want %v", err, storage.ErrChunkNotFound) + if err != chunk.ErrChunkNotFound { + t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound) } }) @@ -137,11 +137,11 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { testHookCollectGarbageChan <- collectedCount })() - addrs := make([]storage.Address, 0) + addrs := make([]chunk.Address, 0) // upload random chunks just up to the capacity for i := 0; i < int(db.capacity)-1; i++ { - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := uploader.Put(chunk) if err != nil { @@ -156,6 +156,14 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { addrs = append(addrs, chunk.Address()) } + // set update gc test hook to signal when + // update gc goroutine is done by closing + // testHookUpdateGCChan channel + testHookUpdateGCChan := make(chan struct{}) + resetTestHookUpdateGC := setTestHookUpdateGC(func() { + close(testHookUpdateGCChan) + }) + // request the latest synced chunk // to prioritize it in the gc index // not to be collected @@ -164,18 +172,29 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { t.Fatal(err) } + // wait for update gc goroutine to finish for garbage + // collector to be correctly triggered after the last upload + select { + case <-testHookUpdateGCChan: + case <-time.After(10 * time.Second): + t.Fatal("updateGC was not called after getting chunk with ModeGetRequest") + } + + // no need to wait for update gc hook anymore + resetTestHookUpdateGC() + // upload and sync another chunk to trigger // garbage collection - chunk := generateRandomChunk() - err = uploader.Put(chunk) + ch := generateTestRandomChunk() + err = uploader.Put(ch) if err != nil { t.Fatal(err) } - err = syncer.Set(chunk.Address()) + err = syncer.Set(ch.Address()) if err != nil { t.Fatal(err) } - addrs = append(addrs, chunk.Address()) + addrs = append(addrs, ch.Address()) // wait for garbage collection @@ -217,8 +236,8 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // the second synced chunk should be removed t.Run("get gc-ed chunk", func(t *testing.T) { _, err := db.NewGetter(ModeGetRequest).Get(addrs[1]) - if err != storage.ErrChunkNotFound { - t.Errorf("got error %v, want %v", err, storage.ErrChunkNotFound) + if err != chunk.ErrChunkNotFound { + t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound) } }) @@ -254,7 +273,7 @@ func TestDB_gcSize(t *testing.T) { count := 100 for i := 0; i < count; i++ { - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := uploader.Put(chunk) if err != nil { diff --git a/swarm/storage/localstore/index_test.go b/swarm/storage/localstore/index_test.go index d9abf440f..cf19e4f6c 100644 --- a/swarm/storage/localstore/index_test.go +++ b/swarm/storage/localstore/index_test.go @@ -21,7 +21,7 @@ import ( "math/rand" "testing" - "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/chunk" ) // TestDB_pullIndex validates the ordering of keys in pull index. @@ -43,7 +43,7 @@ func TestDB_pullIndex(t *testing.T) { // upload random chunks for i := 0; i < chunkCount; i++ { - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := uploader.Put(chunk) if err != nil { @@ -62,8 +62,8 @@ func TestDB_pullIndex(t *testing.T) { } testItemsOrder(t, db.pullIndex, chunks, func(i, j int) (less bool) { - poi := storage.Proximity(db.baseKey, chunks[i].Address()) - poj := storage.Proximity(db.baseKey, chunks[j].Address()) + poi := chunk.Proximity(db.baseKey, chunks[i].Address()) + poj := chunk.Proximity(db.baseKey, chunks[j].Address()) if poi < poj { return true } @@ -95,7 +95,7 @@ func TestDB_gcIndex(t *testing.T) { // upload random chunks for i := 0; i < chunkCount; i++ { - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := uploader.Put(chunk) if err != nil { diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go index f92a9c1f2..a66130fd3 100644 --- a/swarm/storage/localstore/localstore.go +++ b/swarm/storage/localstore/localstore.go @@ -24,8 +24,8 @@ import ( "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" - "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/mock" ) @@ -392,8 +392,8 @@ func (db *DB) Close() (err error) { // po computes the proximity order between the address // and database base key. -func (db *DB) po(addr storage.Address) (bin uint8) { - return uint8(storage.Proximity(db.baseKey, addr)) +func (db *DB) po(addr chunk.Address) (bin uint8) { + return uint8(chunk.Proximity(db.baseKey, addr)) } var ( @@ -409,7 +409,7 @@ var ( // If the address is locked this function will check it // in a for loop for addressLockTimeout time, after which // it will return ErrAddressLockTimeout error. -func (db *DB) lockAddr(addr storage.Address) (unlock func(), err error) { +func (db *DB) lockAddr(addr chunk.Address) (unlock func(), err error) { start := time.Now() lockKey := hex.EncodeToString(addr) for { @@ -426,7 +426,7 @@ func (db *DB) lockAddr(addr storage.Address) (unlock func(), err error) { } // chunkToItem creates new Item with data provided by the Chunk. -func chunkToItem(ch storage.Chunk) shed.Item { +func chunkToItem(ch chunk.Chunk) shed.Item { return shed.Item{ Address: ch.Address(), Data: ch.Data(), @@ -434,7 +434,7 @@ func chunkToItem(ch storage.Chunk) shed.Item { } // addressToItem creates new Item with a provided address. -func addressToItem(addr storage.Address) shed.Item { +func addressToItem(addr chunk.Address) shed.Item { return shed.Item{ Address: addr, } diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go index 6954b139a..d10624173 100644 --- a/swarm/storage/localstore/localstore_test.go +++ b/swarm/storage/localstore/localstore_test.go @@ -29,9 +29,8 @@ import ( "testing" "time" - ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" - "github.com/ethereum/go-ethereum/swarm/storage" "github.com/syndtr/goleveldb/leveldb" ) @@ -61,7 +60,7 @@ func TestDB(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := db.NewPutter(ModePutUpload).Put(chunk) if err != nil { @@ -115,7 +114,7 @@ func TestDB_updateGCSem(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := db.NewPutter(ModePutUpload).Put(chunk) if err != nil { @@ -188,7 +187,7 @@ func BenchmarkNew(b *testing.B) { uploader := db.NewPutter(ModePutUpload) syncer := db.NewSetter(ModeSetSync) for i := 0; i < count; i++ { - chunk := generateFakeRandomChunk() + chunk := generateTestRandomChunk() err := uploader.Put(chunk) if err != nil { b.Fatal(err) @@ -251,53 +250,47 @@ func newTestDB(t testing.TB, o *Options) (db *DB, cleanupFunc func()) { return db, cleanupFunc } -// generateRandomChunk generates a valid Chunk with -// data size of default chunk size. -func generateRandomChunk() storage.Chunk { - return storage.GenerateRandomChunk(ch.DefaultSize) -} - func init() { - // needed for generateFakeRandomChunk + // needed for generateTestRandomChunk rand.Seed(time.Now().UnixNano()) } -// generateFakeRandomChunk generates a Chunk that is not +// generateTestRandomChunk generates a Chunk that is not // valid, but it contains a random key and a random value. -// This function is faster then storage.GenerateRandomChunk +// This function is faster then storage.generateTestRandomChunk // which generates a valid chunk. // Some tests in this package do not need valid chunks, just // random data, and their execution time can be decreased // using this function. -func generateFakeRandomChunk() storage.Chunk { - data := make([]byte, ch.DefaultSize) +func generateTestRandomChunk() chunk.Chunk { + data := make([]byte, chunk.DefaultSize) rand.Read(data) key := make([]byte, 32) rand.Read(key) - return storage.NewChunk(key, data) + return chunk.NewChunk(key, data) } -// TestGenerateFakeRandomChunk validates that -// generateFakeRandomChunk returns random data by comparing +// TestGenerateTestRandomChunk validates that +// generateTestRandomChunk returns random data by comparing // two generated chunks. -func TestGenerateFakeRandomChunk(t *testing.T) { - c1 := generateFakeRandomChunk() - c2 := generateFakeRandomChunk() +func TestGenerateTestRandomChunk(t *testing.T) { + c1 := generateTestRandomChunk() + c2 := generateTestRandomChunk() addrLen := len(c1.Address()) if addrLen != 32 { t.Errorf("first chunk address length %v, want %v", addrLen, 32) } dataLen := len(c1.Data()) - if dataLen != ch.DefaultSize { - t.Errorf("first chunk data length %v, want %v", dataLen, ch.DefaultSize) + if dataLen != chunk.DefaultSize { + t.Errorf("first chunk data length %v, want %v", dataLen, chunk.DefaultSize) } addrLen = len(c2.Address()) if addrLen != 32 { t.Errorf("second chunk address length %v, want %v", addrLen, 32) } dataLen = len(c2.Data()) - if dataLen != ch.DefaultSize { - t.Errorf("second chunk data length %v, want %v", dataLen, ch.DefaultSize) + if dataLen != chunk.DefaultSize { + t.Errorf("second chunk data length %v, want %v", dataLen, chunk.DefaultSize) } if bytes.Equal(c1.Address(), c2.Address()) { t.Error("fake chunks addresses do not differ") @@ -309,7 +302,7 @@ func TestGenerateFakeRandomChunk(t *testing.T) { // newRetrieveIndexesTest returns a test function that validates if the right // chunk values are in the retrieval indexes. -func newRetrieveIndexesTest(db *DB, chunk storage.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { +func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { return func(t *testing.T) { item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address())) if err != nil { @@ -328,7 +321,7 @@ func newRetrieveIndexesTest(db *DB, chunk storage.Chunk, storeTimestamp, accessT // newRetrieveIndexesTestWithAccess returns a test function that validates if the right // chunk values are in the retrieval indexes when access time must be stored. -func newRetrieveIndexesTestWithAccess(db *DB, chunk storage.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { +func newRetrieveIndexesTestWithAccess(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { return func(t *testing.T) { item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address())) if err != nil { @@ -348,7 +341,7 @@ func newRetrieveIndexesTestWithAccess(db *DB, chunk storage.Chunk, storeTimestam // newPullIndexTest returns a test function that validates if the right // chunk values are in the pull index. -func newPullIndexTest(db *DB, chunk storage.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) { +func newPullIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) { return func(t *testing.T) { item, err := db.pullIndex.Get(shed.Item{ Address: chunk.Address(), @@ -365,7 +358,7 @@ func newPullIndexTest(db *DB, chunk storage.Chunk, storeTimestamp int64, wantErr // newPushIndexTest returns a test function that validates if the right // chunk values are in the push index. -func newPushIndexTest(db *DB, chunk storage.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) { +func newPushIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) { return func(t *testing.T) { item, err := db.pushIndex.Get(shed.Item{ Address: chunk.Address(), @@ -382,7 +375,7 @@ func newPushIndexTest(db *DB, chunk storage.Chunk, storeTimestamp int64, wantErr // newGCIndexTest returns a test function that validates if the right // chunk values are in the push index. -func newGCIndexTest(db *DB, chunk storage.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { +func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) { return func(t *testing.T) { item, err := db.gcIndex.Get(shed.Item{ Address: chunk.Address(), @@ -436,7 +429,7 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) { // testIndexChunk embeds storageChunk with additional data that is stored // in database. It is used for index values validations. type testIndexChunk struct { - storage.Chunk + chunk.Chunk storeTimestamp int64 } diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go index 3a69f6e9d..9640cd27e 100644 --- a/swarm/storage/localstore/mode_get.go +++ b/swarm/storage/localstore/mode_get.go @@ -18,8 +18,8 @@ package localstore import ( "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" - "github.com/ethereum/go-ethereum/swarm/storage" "github.com/syndtr/goleveldb/leveldb" ) @@ -51,23 +51,23 @@ func (db *DB) NewGetter(mode ModeGet) *Getter { } // Get returns a chunk from the database. If the chunk is -// not found storage.ErrChunkNotFound will be returned. +// not found chunk.ErrChunkNotFound will be returned. // All required indexes will be updated required by the // Getter Mode. -func (g *Getter) Get(addr storage.Address) (chunk storage.Chunk, err error) { +func (g *Getter) Get(addr chunk.Address) (ch chunk.Chunk, err error) { out, err := g.db.get(g.mode, addr) if err != nil { if err == leveldb.ErrNotFound { - return nil, storage.ErrChunkNotFound + return nil, chunk.ErrChunkNotFound } return nil, err } - return storage.NewChunk(out.Address, out.Data), nil + return chunk.NewChunk(out.Address, out.Data), nil } // get returns Item from the retrieval index // and updates other indexes. -func (db *DB) get(mode ModeGet, addr storage.Address) (out shed.Item, err error) { +func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) { item := addressToItem(addr) out, err = db.retrievalDataIndex.Get(item) diff --git a/swarm/storage/localstore/mode_get_test.go b/swarm/storage/localstore/mode_get_test.go index 6615a3b88..28a70ee0c 100644 --- a/swarm/storage/localstore/mode_get_test.go +++ b/swarm/storage/localstore/mode_get_test.go @@ -32,7 +32,7 @@ func TestModeGetRequest(t *testing.T) { return uploadTimestamp })() - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := db.NewPutter(ModePutUpload).Put(chunk) if err != nil { @@ -146,7 +146,7 @@ func TestModeGetSync(t *testing.T) { return uploadTimestamp })() - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := db.NewPutter(ModePutUpload).Put(chunk) if err != nil { diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go index 1a5a3d1b1..81df43535 100644 --- a/swarm/storage/localstore/mode_put.go +++ b/swarm/storage/localstore/mode_put.go @@ -17,8 +17,8 @@ package localstore import ( + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" - "github.com/ethereum/go-ethereum/swarm/storage" "github.com/syndtr/goleveldb/leveldb" ) @@ -53,7 +53,7 @@ func (db *DB) NewPutter(mode ModePut) *Putter { // Put stores the Chunk to database and depending // on the Putter mode, it updates required indexes. -func (p *Putter) Put(ch storage.Chunk) (err error) { +func (p *Putter) Put(ch chunk.Chunk) (err error) { return p.db.put(p.mode, chunkToItem(ch)) } diff --git a/swarm/storage/localstore/mode_put_test.go b/swarm/storage/localstore/mode_put_test.go index ffe6a4cb4..8ecae1d2e 100644 --- a/swarm/storage/localstore/mode_put_test.go +++ b/swarm/storage/localstore/mode_put_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/chunk" ) // TestModePutRequest validates ModePutRequest index values on the provided DB. @@ -33,7 +33,7 @@ func TestModePutRequest(t *testing.T) { putter := db.NewPutter(ModePutRequest) - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() // keep the record when the chunk is stored var storeTimestamp int64 @@ -87,7 +87,7 @@ func TestModePutSync(t *testing.T) { return wantTimestamp })() - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := db.NewPutter(ModePutSync).Put(chunk) if err != nil { @@ -109,7 +109,7 @@ func TestModePutUpload(t *testing.T) { return wantTimestamp })() - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := db.NewPutter(ModePutUpload).Put(chunk) if err != nil { @@ -132,7 +132,7 @@ func TestModePutUpload_parallel(t *testing.T) { chunkCount := 1000 workerCount := 100 - chunkChan := make(chan storage.Chunk) + chunkChan := make(chan chunk.Chunk) errChan := make(chan error) doneChan := make(chan struct{}) defer close(doneChan) @@ -159,13 +159,13 @@ func TestModePutUpload_parallel(t *testing.T) { }(i) } - chunks := make([]storage.Chunk, 0) + chunks := make([]chunk.Chunk, 0) var chunksMu sync.Mutex // send chunks to workers go func() { for i := 0; i < chunkCount; i++ { - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() select { case chunkChan <- chunk: case <-doneChan: @@ -271,9 +271,9 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int) defer cleanupFunc() uploader := db.NewPutter(ModePutUpload) - chunks := make([]storage.Chunk, count) + chunks := make([]chunk.Chunk, count) for i := 0; i < count; i++ { - chunks[i] = generateFakeRandomChunk() + chunks[i] = generateTestRandomChunk() } errs := make(chan error) b.StartTimer() diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go index a522f4447..a7c9875fe 100644 --- a/swarm/storage/localstore/mode_set.go +++ b/swarm/storage/localstore/mode_set.go @@ -17,7 +17,7 @@ package localstore import ( - "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/syndtr/goleveldb/leveldb" ) @@ -53,7 +53,7 @@ func (db *DB) NewSetter(mode ModeSet) *Setter { // Set updates database indexes for a specific // chunk represented by the address. -func (s *Setter) Set(addr storage.Address) (err error) { +func (s *Setter) Set(addr chunk.Address) (err error) { return s.db.set(s.mode, addr) } @@ -61,7 +61,7 @@ func (s *Setter) Set(addr storage.Address) (err error) { // chunk represented by the address. // It acquires lockAddr to protect two calls // of this function for the same address in parallel. -func (db *DB) set(mode ModeSet, addr storage.Address) (err error) { +func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { // protect parallel updates unlock, err := db.lockAddr(addr) if err != nil { diff --git a/swarm/storage/localstore/mode_set_test.go b/swarm/storage/localstore/mode_set_test.go index 94cd0a3e2..674aaabec 100644 --- a/swarm/storage/localstore/mode_set_test.go +++ b/swarm/storage/localstore/mode_set_test.go @@ -28,7 +28,7 @@ func TestModeSetAccess(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() wantTimestamp := time.Now().UTC().UnixNano() defer setNow(func() (t int64) { @@ -56,7 +56,7 @@ func TestModeSetSync(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() wantTimestamp := time.Now().UTC().UnixNano() defer setNow(func() (t int64) { @@ -89,7 +89,7 @@ func TestModeSetRemove(t *testing.T) { db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := db.NewPutter(ModePutUpload).Put(chunk) if err != nil { diff --git a/swarm/storage/localstore/retrieval_index_test.go b/swarm/storage/localstore/retrieval_index_test.go index 9f5b452c5..b08790124 100644 --- a/swarm/storage/localstore/retrieval_index_test.go +++ b/swarm/storage/localstore/retrieval_index_test.go @@ -20,7 +20,7 @@ import ( "strconv" "testing" - "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/chunk" ) // BenchmarkRetrievalIndexes uploads a number of chunks in order to measure @@ -64,9 +64,9 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) { uploader := db.NewPutter(ModePutUpload) syncer := db.NewSetter(ModeSetSync) requester := db.NewGetter(ModeGetRequest) - addrs := make([]storage.Address, count) + addrs := make([]chunk.Address, count) for i := 0; i < count; i++ { - chunk := generateFakeRandomChunk() + chunk := generateTestRandomChunk() err := uploader.Put(chunk) if err != nil { b.Fatal(err) @@ -134,9 +134,9 @@ func benchmarkUpload(b *testing.B, o *Options, count int) { db, cleanupFunc := newTestDB(b, o) defer cleanupFunc() uploader := db.NewPutter(ModePutUpload) - chunks := make([]storage.Chunk, count) + chunks := make([]chunk.Chunk, count) for i := 0; i < count; i++ { - chunk := generateFakeRandomChunk() + chunk := generateTestRandomChunk() chunks[i] = chunk } b.StartTimer() diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go index a18f0915d..0830eee70 100644 --- a/swarm/storage/localstore/subscription_pull.go +++ b/swarm/storage/localstore/subscription_pull.go @@ -24,8 +24,8 @@ import ( "sync" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" - "github.com/ethereum/go-ethereum/swarm/storage" ) // SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index. @@ -161,7 +161,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkD // ChunkDescriptor holds information required for Pull syncing. This struct // is provided by subscribing to pull index. type ChunkDescriptor struct { - Address storage.Address + Address chunk.Address StoreTimestamp int64 } diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go index 9800329ea..130f0c9fe 100644 --- a/swarm/storage/localstore/subscription_pull_test.go +++ b/swarm/storage/localstore/subscription_pull_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/chunk" ) // TestDB_SubscribePull uploads some chunks before and after @@ -37,7 +37,7 @@ func TestDB_SubscribePull(t *testing.T) { uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]storage.Address) + addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int @@ -53,7 +53,7 @@ func TestDB_SubscribePull(t *testing.T) { // to validate the number of addresses received by the subscription errChan := make(chan error) - for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { ch, stop := db.SubscribePull(ctx, bin, nil, nil) defer stop() @@ -84,7 +84,7 @@ func TestDB_SubscribePull_multiple(t *testing.T) { uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]storage.Address) + addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int @@ -105,7 +105,7 @@ func TestDB_SubscribePull_multiple(t *testing.T) { // start a number of subscriptions // that all of them will write every address error to errChan for j := 0; j < subsCount; j++ { - for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { ch, stop := db.SubscribePull(ctx, bin, nil, nil) defer stop() @@ -137,7 +137,7 @@ func TestDB_SubscribePull_since(t *testing.T) { uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]storage.Address) + addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int @@ -156,20 +156,20 @@ func TestDB_SubscribePull_since(t *testing.T) { last = make(map[uint8]ChunkDescriptor) for i := 0; i < count; i++ { - chunk := generateRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + err := uploader.Put(ch) if err != nil { t.Fatal(err) } - bin := db.po(chunk.Address()) + bin := db.po(ch.Address()) if _, ok := addrs[bin]; !ok { - addrs[bin] = make([]storage.Address, 0) + addrs[bin] = make([]chunk.Address, 0) } if wanted { - addrs[bin] = append(addrs[bin], chunk.Address()) + addrs[bin] = append(addrs[bin], ch.Address()) wantedChunksCount++ } @@ -178,7 +178,7 @@ func TestDB_SubscribePull_since(t *testing.T) { lastTimestampMu.RUnlock() last[bin] = ChunkDescriptor{ - Address: chunk.Address(), + Address: ch.Address(), StoreTimestamp: storeTimestamp, } } @@ -199,7 +199,7 @@ func TestDB_SubscribePull_since(t *testing.T) { // to validate the number of addresses received by the subscription errChan := make(chan error) - for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { var since *ChunkDescriptor if c, ok := last[bin]; ok { since = &c @@ -228,7 +228,7 @@ func TestDB_SubscribePull_until(t *testing.T) { uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]storage.Address) + addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int @@ -247,20 +247,20 @@ func TestDB_SubscribePull_until(t *testing.T) { last = make(map[uint8]ChunkDescriptor) for i := 0; i < count; i++ { - chunk := generateRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + err := uploader.Put(ch) if err != nil { t.Fatal(err) } - bin := db.po(chunk.Address()) + bin := db.po(ch.Address()) if _, ok := addrs[bin]; !ok { - addrs[bin] = make([]storage.Address, 0) + addrs[bin] = make([]chunk.Address, 0) } if wanted { - addrs[bin] = append(addrs[bin], chunk.Address()) + addrs[bin] = append(addrs[bin], ch.Address()) wantedChunksCount++ } @@ -269,7 +269,7 @@ func TestDB_SubscribePull_until(t *testing.T) { lastTimestampMu.RUnlock() last[bin] = ChunkDescriptor{ - Address: chunk.Address(), + Address: ch.Address(), StoreTimestamp: storeTimestamp, } } @@ -290,7 +290,7 @@ func TestDB_SubscribePull_until(t *testing.T) { // to validate the number of addresses received by the subscription errChan := make(chan error) - for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { until, ok := last[bin] if !ok { continue @@ -318,7 +318,7 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { uploader := db.NewPutter(ModePutUpload) - addrs := make(map[uint8][]storage.Address) + addrs := make(map[uint8][]chunk.Address) var addrsMu sync.Mutex var wantedChunksCount int @@ -337,20 +337,20 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { last = make(map[uint8]ChunkDescriptor) for i := 0; i < count; i++ { - chunk := generateRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + err := uploader.Put(ch) if err != nil { t.Fatal(err) } - bin := db.po(chunk.Address()) + bin := db.po(ch.Address()) if _, ok := addrs[bin]; !ok { - addrs[bin] = make([]storage.Address, 0) + addrs[bin] = make([]chunk.Address, 0) } if wanted { - addrs[bin] = append(addrs[bin], chunk.Address()) + addrs[bin] = append(addrs[bin], ch.Address()) wantedChunksCount++ } @@ -359,7 +359,7 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { lastTimestampMu.RUnlock() last[bin] = ChunkDescriptor{ - Address: chunk.Address(), + Address: ch.Address(), StoreTimestamp: storeTimestamp, } } @@ -386,7 +386,7 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { // to validate the number of addresses received by the subscription errChan := make(chan error) - for bin := uint8(0); bin <= uint8(storage.MaxPO); bin++ { + for bin := uint8(0); bin <= uint8(chunk.MaxPO); bin++ { var since *ChunkDescriptor if c, ok := upload1[bin]; ok { since = &c @@ -412,23 +412,23 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { // uploadRandomChunksBin uploads random chunks to database and adds them to // the map of addresses ber bin. -func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uint8][]storage.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) { +func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) { addrsMu.Lock() defer addrsMu.Unlock() for i := 0; i < count; i++ { - chunk := generateRandomChunk() + ch := generateTestRandomChunk() - err := uploader.Put(chunk) + err := uploader.Put(ch) if err != nil { t.Fatal(err) } - bin := db.po(chunk.Address()) + bin := db.po(ch.Address()) if _, ok := addrs[bin]; !ok { - addrs[bin] = make([]storage.Address, 0) + addrs[bin] = make([]chunk.Address, 0) } - addrs[bin] = append(addrs[bin], chunk.Address()) + addrs[bin] = append(addrs[bin], ch.Address()) *wantedChunksCount++ } @@ -437,7 +437,7 @@ func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uin // readPullSubscriptionBin is a helper function that reads all ChunkDescriptors from a channel and // sends error to errChan, even if it is nil, to count the number of ChunkDescriptors // returned by the channel. -func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDescriptor, addrs map[uint8][]storage.Address, addrsMu *sync.Mutex, errChan chan error) { +func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDescriptor, addrs map[uint8][]chunk.Address, addrsMu *sync.Mutex, errChan chan error) { var i int // address index for { select { diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go index b13f29399..5cbc2eb6f 100644 --- a/swarm/storage/localstore/subscription_push.go +++ b/swarm/storage/localstore/subscription_push.go @@ -21,16 +21,16 @@ import ( "sync" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" - "github.com/ethereum/go-ethereum/swarm/storage" ) // SubscribePush returns a channel that provides storage chunks with ordering from push syncing index. // Returned stop function will terminate current and further iterations, and also it will close // the returned channel without any errors. Make sure that you check the second returned parameter // from the channel to stop iteration when its value is false. -func (db *DB) SubscribePush(ctx context.Context) (c <-chan storage.Chunk, stop func()) { - chunks := make(chan storage.Chunk) +func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) { + chunks := make(chan chunk.Chunk) trigger := make(chan struct{}, 1) db.pushTriggersMu.Lock() @@ -65,7 +65,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan storage.Chunk, stop f } select { - case chunks <- storage.NewChunk(dataItem.Address, dataItem.Data): + case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data): // set next iteration start item // when its chunk is successfully sent to channel sinceItem = &item diff --git a/swarm/storage/localstore/subscription_push_test.go b/swarm/storage/localstore/subscription_push_test.go index 0c8d7d0b9..30fb98eb2 100644 --- a/swarm/storage/localstore/subscription_push_test.go +++ b/swarm/storage/localstore/subscription_push_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/chunk" ) // TestDB_SubscribePush uploads some chunks before and after @@ -36,7 +36,7 @@ func TestDB_SubscribePush(t *testing.T) { uploader := db.NewPutter(ModePutUpload) - chunks := make([]storage.Chunk, 0) + chunks := make([]chunk.Chunk, 0) var chunksMu sync.Mutex uploadRandomChunks := func(count int) { @@ -44,7 +44,7 @@ func TestDB_SubscribePush(t *testing.T) { defer chunksMu.Unlock() for i := 0; i < count; i++ { - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := uploader.Put(chunk) if err != nil { @@ -124,7 +124,7 @@ func TestDB_SubscribePush_multiple(t *testing.T) { uploader := db.NewPutter(ModePutUpload) - addrs := make([]storage.Address, 0) + addrs := make([]chunk.Address, 0) var addrsMu sync.Mutex uploadRandomChunks := func(count int) { @@ -132,7 +132,7 @@ func TestDB_SubscribePush_multiple(t *testing.T) { defer addrsMu.Unlock() for i := 0; i < count; i++ { - chunk := generateRandomChunk() + chunk := generateTestRandomChunk() err := uploader.Put(chunk) if err != nil { diff --git a/swarm/storage/localstore_test.go b/swarm/storage/localstore_test.go index ec69951c4..fcadcefa0 100644 --- a/swarm/storage/localstore_test.go +++ b/swarm/storage/localstore_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/chunk" ) var ( @@ -65,7 +65,7 @@ func TestValidator(t *testing.T) { // add content address validator and check puts // bad should fail, good should pass store.Validators = append(store.Validators, NewContentAddressValidator(hashfunc)) - chunks = GenerateRandomChunks(ch.DefaultSize, 2) + chunks = GenerateRandomChunks(chunk.DefaultSize, 2) goodChunk = chunks[0] badChunk = chunks[1] copy(badChunk.Data(), goodChunk.Data()) @@ -83,7 +83,7 @@ func TestValidator(t *testing.T) { var negV boolTestValidator store.Validators = append(store.Validators, negV) - chunks = GenerateRandomChunks(ch.DefaultSize, 2) + chunks = GenerateRandomChunks(chunk.DefaultSize, 2) goodChunk = chunks[0] badChunk = chunks[1] copy(badChunk.Data(), goodChunk.Data()) @@ -101,7 +101,7 @@ func TestValidator(t *testing.T) { var posV boolTestValidator = true store.Validators = append(store.Validators, posV) - chunks = GenerateRandomChunks(ch.DefaultSize, 2) + chunks = GenerateRandomChunks(chunk.DefaultSize, 2) goodChunk = chunks[0] badChunk = chunks[1] copy(badChunk.Data(), goodChunk.Data()) @@ -138,7 +138,7 @@ func putChunks(store *LocalStore, chunks ...Chunk) []error { func put(store *LocalStore, n int, f func(i int64) Chunk) (hs []Address, errs []error) { for i := int64(0); i < int64(n); i++ { - chunk := f(ch.DefaultSize) + chunk := f(chunk.DefaultSize) err := store.Put(context.TODO(), chunk) errs = append(errs, err) hs = append(hs, chunk.Address()) @@ -158,7 +158,7 @@ func TestGetFrequentlyAccessedChunkWontGetGarbageCollected(t *testing.T) { var chunks []Chunk for i := 0; i < ldbCap; i++ { - chunks = append(chunks, GenerateRandomChunk(ch.DefaultSize)) + chunks = append(chunks, GenerateRandomChunk(chunk.DefaultSize)) } mostAccessed := chunks[0].Address() diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go index 88ec6c28f..653877625 100644 --- a/swarm/storage/netstore_test.go +++ b/swarm/storage/netstore_test.go @@ -29,7 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p/enode" - ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/chunk" ) var sourcePeerID = enode.HexID("99d8594b52298567d2ca3f4c441a5ba0140ee9245e26460d01102a52773c73b9") @@ -114,7 +114,7 @@ func mustNewNetStoreWithFetcher(t *testing.T) (*NetStore, *mockNetFetcher) { func TestNetStoreGetAndPut(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() @@ -174,7 +174,7 @@ func TestNetStoreGetAndPut(t *testing.T) { func TestNetStoreGetAfterPut(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() @@ -209,7 +209,7 @@ func TestNetStoreGetAfterPut(t *testing.T) { func TestNetStoreGetTimeout(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() @@ -261,7 +261,7 @@ func TestNetStoreGetTimeout(t *testing.T) { func TestNetStoreGetCancel(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -313,7 +313,7 @@ func TestNetStoreGetCancel(t *testing.T) { func TestNetStoreMultipleGetAndPut(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() @@ -387,7 +387,7 @@ func TestNetStoreMultipleGetAndPut(t *testing.T) { func TestNetStoreFetchFuncTimeout(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() @@ -426,7 +426,7 @@ func TestNetStoreFetchFuncTimeout(t *testing.T) { func TestNetStoreFetchFuncAfterPut(t *testing.T) { netStore := mustNewNetStore(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -453,7 +453,7 @@ func TestNetStoreFetchFuncAfterPut(t *testing.T) { func TestNetStoreGetCallsRequest(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx := context.WithValue(context.Background(), "hopcount", uint8(5)) ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) @@ -481,7 +481,7 @@ func TestNetStoreGetCallsRequest(t *testing.T) { func TestNetStoreGetCallsOffer(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) // If a source peer is added to the context, NetStore will handle it as an offer ctx := context.WithValue(context.Background(), "source", sourcePeerID.String()) @@ -567,7 +567,7 @@ func TestNetStoreFetcherCountPeers(t *testing.T) { func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() @@ -632,7 +632,7 @@ func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) { netStore, fetcher := mustNewNetStoreWithFetcher(t) - chunk := GenerateRandomChunk(ch.DefaultSize) + chunk := GenerateRandomChunk(chunk.DefaultSize) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index ed0f843b9..281bbe9fe 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -25,7 +25,7 @@ import ( "sync" "time" - ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" ) @@ -97,11 +97,11 @@ func NewPyramidSplitterParams(addr Address, reader io.Reader, putter Putter, get New chunks to store are store using the putter which the caller provides. */ func PyramidSplit(ctx context.Context, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, ch.DefaultSize)).Split(ctx) + return NewPyramidSplitter(NewPyramidSplitterParams(nil, reader, putter, getter, chunk.DefaultSize)).Split(ctx) } func PyramidAppend(ctx context.Context, addr Address, reader io.Reader, putter Putter, getter Getter) (Address, func(context.Context) error, error) { - return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, ch.DefaultSize)).Append(ctx) + return NewPyramidSplitter(NewPyramidSplitterParams(addr, reader, putter, getter, chunk.DefaultSize)).Append(ctx) } // Entry to create a tree node diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 7ec21328e..2f39685b4 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -22,53 +22,29 @@ import ( "crypto" "crypto/rand" "encoding/binary" - "fmt" "io" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/swarm/bmt" - ch "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/chunk" "golang.org/x/crypto/sha3" ) -const MaxPO = 16 -const AddressLength = 32 +// MaxPO is the same as chunk.MaxPO for backward compatibility. +const MaxPO = chunk.MaxPO + +// AddressLength is the same as chunk.AddressLength for backward compatibility. +const AddressLength = chunk.AddressLength type SwarmHasher func() SwarmHash -type Address []byte +// Address is an alias for chunk.Address for backward compatibility. +type Address = chunk.Address -// Proximity(x, y) returns the proximity order of the MSB distance between x and y -// -// The distance metric MSB(x, y) of two equal length byte sequences x an y is the -// value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed. -// the binary cast is big endian: most significant bit first (=MSB). -// -// Proximity(x, y) is a discrete logarithmic scaling of the MSB distance. -// It is defined as the reverse rank of the integer part of the base 2 -// logarithm of the distance. -// It is calculated by counting the number of common leading zeros in the (MSB) -// binary representation of the x^y. -// -// (0 farthest, 255 closest, 256 self) -func Proximity(one, other []byte) (ret int) { - b := (MaxPO-1)/8 + 1 - if b > len(one) { - b = len(one) - } - m := 8 - for i := 0; i < b; i++ { - oxo := one[i] ^ other[i] - for j := 0; j < m; j++ { - if (oxo>>uint8(7-j))&0x01 != 0 { - return i*8 + j - } - } - } - return MaxPO -} +// Proximity is the same as chunk.Proximity for backward compatibility. +var Proximity = chunk.Proximity -var ZeroAddr = Address(common.Hash{}.Bytes()) +// ZeroAddr is the same as chunk.ZeroAddr for backward compatibility. +var ZeroAddr = chunk.ZeroAddr func MakeHashFunc(hash string) SwarmHasher { switch hash { @@ -80,7 +56,7 @@ func MakeHashFunc(hash string) SwarmHasher { return func() SwarmHash { hasher := sha3.NewLegacyKeccak256 hasherSize := hasher().Size() - segmentCount := ch.DefaultSize / hasherSize + segmentCount := chunk.DefaultSize / hasherSize pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize) return bmt.New(pool) } @@ -88,33 +64,6 @@ func MakeHashFunc(hash string) SwarmHasher { return nil } -func (a Address) Hex() string { - return fmt.Sprintf("%064x", []byte(a[:])) -} - -func (a Address) Log() string { - if len(a[:]) < 8 { - return fmt.Sprintf("%x", []byte(a[:])) - } - return fmt.Sprintf("%016x", []byte(a[:8])) -} - -func (a Address) String() string { - return fmt.Sprintf("%064x", []byte(a)) -} - -func (a Address) MarshalJSON() (out []byte, err error) { - return []byte(`"` + a.String() + `"`), nil -} - -func (a *Address) UnmarshalJSON(value []byte) error { - s := string(value) - *a = make([]byte, 32) - h := common.Hex2Bytes(s[1 : len(s)-1]) - copy(*a, h) - return nil -} - type AddressCollection []Address func NewAddressCollection(l int) AddressCollection { @@ -133,38 +82,11 @@ func (c AddressCollection) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -// Chunk interface implemented by context.Contexts and data chunks -type Chunk interface { - Address() Address - Data() []byte -} +// Chunk is an alias for chunk.Chunk for backward compatibility. +type Chunk = chunk.Chunk -type chunk struct { - addr Address - sdata []byte - span int64 -} - -func NewChunk(addr Address, data []byte) *chunk { - return &chunk{ - addr: addr, - sdata: data, - span: -1, - } -} - -func (c *chunk) Address() Address { - return c.addr -} - -func (c *chunk) Data() []byte { - return c.sdata -} - -// String() for pretty printing -func (self *chunk) String() string { - return fmt.Sprintf("Address: %v TreeSize: %v Chunksize: %v", self.addr.Log(), self.span, len(self.sdata)) -} +// NewChunk is the same as chunk.NewChunk for backward compatibility. +var NewChunk = chunk.NewChunk func GenerateRandomChunk(dataSize int64) Chunk { hasher := MakeHashFunc(DefaultHash)() @@ -274,9 +196,9 @@ func NewContentAddressValidator(hasher SwarmHasher) *ContentAddressValidator { } // Validate that the given key is a valid content address for the given data -func (v *ContentAddressValidator) Validate(chunk Chunk) bool { - data := chunk.Data() - if l := len(data); l < 9 || l > ch.DefaultSize+8 { +func (v *ContentAddressValidator) Validate(ch Chunk) bool { + data := ch.Data() + if l := len(data); l < 9 || l > chunk.DefaultSize+8 { // log.Error("invalid chunk size", "chunk", addr.Hex(), "size", l) return false } @@ -286,7 +208,7 @@ func (v *ContentAddressValidator) Validate(chunk Chunk) bool { hasher.Write(data[8:]) hash := hasher.Sum(nil) - return bytes.Equal(hash, chunk.Address()) + return bytes.Equal(hash, ch.Address()) } type ChunkStore interface {