ff3a5d24d2
* swarm/storage: remove redundant increments for dataIdx and entryCnt * swarm/storage: add Delete to LDBStore * swarm/storage: wait for garbage collection
815 lines
20 KiB
Go
815 lines
20 KiB
Go
// Copyright 2016 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// disk storage layer for the package bzz
|
|
// DbStore implements the ChunkStore interface and is used by the FileStore as
|
|
// persistent storage of chunks
|
|
// it implements purging based on access count allowing for external control of
|
|
// max capacity
|
|
|
|
package storage
|
|
|
|
import (
|
|
"archive/tar"
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
"github.com/ethereum/go-ethereum/swarm/chunk"
|
|
"github.com/ethereum/go-ethereum/swarm/log"
|
|
"github.com/ethereum/go-ethereum/swarm/storage/mock"
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
)
|
|
|
|
const (
|
|
gcArrayFreeRatio = 0.1
|
|
maxGCitems = 5000 // max number of items to be gc'd per call to collectGarbage()
|
|
)
|
|
|
|
var (
|
|
dbEntryCount = metrics.NewRegisteredCounter("ldbstore.entryCnt", nil)
|
|
)
|
|
|
|
var (
|
|
keyIndex = byte(0)
|
|
keyOldData = byte(1)
|
|
keyAccessCnt = []byte{2}
|
|
keyEntryCnt = []byte{3}
|
|
keyDataIdx = []byte{4}
|
|
keyData = byte(6)
|
|
keyDistanceCnt = byte(7)
|
|
)
|
|
|
|
type gcItem struct {
|
|
idx uint64
|
|
value uint64
|
|
idxKey []byte
|
|
po uint8
|
|
}
|
|
|
|
type LDBStoreParams struct {
|
|
*StoreParams
|
|
Path string
|
|
Po func(Address) uint8
|
|
}
|
|
|
|
// NewLDBStoreParams constructs LDBStoreParams with the specified values.
|
|
func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams {
|
|
return &LDBStoreParams{
|
|
StoreParams: storeparams,
|
|
Path: path,
|
|
Po: func(k Address) (ret uint8) { return uint8(Proximity(storeparams.BaseKey[:], k[:])) },
|
|
}
|
|
}
|
|
|
|
type LDBStore struct {
|
|
db *LDBDatabase
|
|
|
|
// this should be stored in db, accessed transactionally
|
|
entryCnt uint64 // number of items in the LevelDB
|
|
accessCnt uint64 // ever-accumulating number increased every time we read/access an entry
|
|
dataIdx uint64 // similar to entryCnt, but we only increment it
|
|
capacity uint64
|
|
bucketCnt []uint64
|
|
|
|
hashfunc SwarmHasher
|
|
po func(Address) uint8
|
|
|
|
batchC chan bool
|
|
batchesC chan struct{}
|
|
batch *leveldb.Batch
|
|
lock sync.RWMutex
|
|
quit chan struct{}
|
|
|
|
// Functions encodeDataFunc is used to bypass
|
|
// the default functionality of DbStore with
|
|
// mock.NodeStore for testing purposes.
|
|
encodeDataFunc func(chunk *Chunk) []byte
|
|
// If getDataFunc is defined, it will be used for
|
|
// retrieving the chunk data instead from the local
|
|
// LevelDB database.
|
|
getDataFunc func(addr Address) (data []byte, err error)
|
|
}
|
|
|
|
// TODO: Instead of passing the distance function, just pass the address from which distances are calculated
|
|
// to avoid the appearance of a pluggable distance metric and opportunities of bugs associated with providing
|
|
// a function different from the one that is actually used.
|
|
func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
|
|
s = new(LDBStore)
|
|
s.hashfunc = params.Hash
|
|
s.quit = make(chan struct{})
|
|
|
|
s.batchC = make(chan bool)
|
|
s.batchesC = make(chan struct{}, 1)
|
|
go s.writeBatches()
|
|
s.batch = new(leveldb.Batch)
|
|
// associate encodeData with default functionality
|
|
s.encodeDataFunc = encodeData
|
|
|
|
s.db, err = NewLDBDatabase(params.Path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.po = params.Po
|
|
s.setCapacity(params.DbCapacity)
|
|
|
|
s.bucketCnt = make([]uint64, 0x100)
|
|
for i := 0; i < 0x100; i++ {
|
|
k := make([]byte, 2)
|
|
k[0] = keyDistanceCnt
|
|
k[1] = uint8(i)
|
|
cnt, _ := s.db.Get(k)
|
|
s.bucketCnt[i] = BytesToU64(cnt)
|
|
s.bucketCnt[i]++
|
|
}
|
|
data, _ := s.db.Get(keyEntryCnt)
|
|
s.entryCnt = BytesToU64(data)
|
|
data, _ = s.db.Get(keyAccessCnt)
|
|
s.accessCnt = BytesToU64(data)
|
|
data, _ = s.db.Get(keyDataIdx)
|
|
s.dataIdx = BytesToU64(data)
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// NewMockDbStore creates a new instance of DbStore with
|
|
// mockStore set to a provided value. If mockStore argument is nil,
|
|
// this function behaves exactly as NewDbStore.
|
|
func NewMockDbStore(params *LDBStoreParams, mockStore *mock.NodeStore) (s *LDBStore, err error) {
|
|
s, err = NewLDBStore(params)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// replace put and get with mock store functionality
|
|
if mockStore != nil {
|
|
s.encodeDataFunc = newMockEncodeDataFunc(mockStore)
|
|
s.getDataFunc = newMockGetDataFunc(mockStore)
|
|
}
|
|
return
|
|
}
|
|
|
|
type dpaDBIndex struct {
|
|
Idx uint64
|
|
Access uint64
|
|
}
|
|
|
|
func BytesToU64(data []byte) uint64 {
|
|
if len(data) < 8 {
|
|
return 0
|
|
}
|
|
return binary.BigEndian.Uint64(data)
|
|
}
|
|
|
|
func U64ToBytes(val uint64) []byte {
|
|
data := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(data, val)
|
|
return data
|
|
}
|
|
|
|
func (s *LDBStore) updateIndexAccess(index *dpaDBIndex) {
|
|
index.Access = s.accessCnt
|
|
}
|
|
|
|
func getIndexKey(hash Address) []byte {
|
|
hashSize := len(hash)
|
|
key := make([]byte, hashSize+1)
|
|
key[0] = keyIndex
|
|
copy(key[1:], hash[:])
|
|
return key
|
|
}
|
|
|
|
func getOldDataKey(idx uint64) []byte {
|
|
key := make([]byte, 9)
|
|
key[0] = keyOldData
|
|
binary.BigEndian.PutUint64(key[1:9], idx)
|
|
|
|
return key
|
|
}
|
|
|
|
func getDataKey(idx uint64, po uint8) []byte {
|
|
key := make([]byte, 10)
|
|
key[0] = keyData
|
|
key[1] = po
|
|
binary.BigEndian.PutUint64(key[2:], idx)
|
|
|
|
return key
|
|
}
|
|
|
|
func encodeIndex(index *dpaDBIndex) []byte {
|
|
data, _ := rlp.EncodeToBytes(index)
|
|
return data
|
|
}
|
|
|
|
func encodeData(chunk *Chunk) []byte {
|
|
// Always create a new underlying array for the returned byte slice.
|
|
// The chunk.Key array may be used in the returned slice which
|
|
// may be changed later in the code or by the LevelDB, resulting
|
|
// that the Key is changed as well.
|
|
return append(append([]byte{}, chunk.Addr[:]...), chunk.SData...)
|
|
}
|
|
|
|
func decodeIndex(data []byte, index *dpaDBIndex) error {
|
|
dec := rlp.NewStream(bytes.NewReader(data), 0)
|
|
return dec.Decode(index)
|
|
}
|
|
|
|
func decodeData(data []byte, chunk *Chunk) {
|
|
chunk.SData = data[32:]
|
|
chunk.Size = int64(binary.BigEndian.Uint64(data[32:40]))
|
|
}
|
|
|
|
func decodeOldData(data []byte, chunk *Chunk) {
|
|
chunk.SData = data
|
|
chunk.Size = int64(binary.BigEndian.Uint64(data[0:8]))
|
|
}
|
|
|
|
func (s *LDBStore) collectGarbage(ratio float32) {
|
|
log.Trace("collectGarbage", "ratio", ratio)
|
|
|
|
metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
|
|
|
|
it := s.db.NewIterator()
|
|
defer it.Release()
|
|
|
|
garbage := []*gcItem{}
|
|
gcnt := 0
|
|
|
|
for ok := it.Seek([]byte{keyIndex}); ok && (gcnt < maxGCitems) && (uint64(gcnt) < s.entryCnt); ok = it.Next() {
|
|
itkey := it.Key()
|
|
|
|
if (itkey == nil) || (itkey[0] != keyIndex) {
|
|
break
|
|
}
|
|
|
|
// it.Key() contents change on next call to it.Next(), so we must copy it
|
|
key := make([]byte, len(it.Key()))
|
|
copy(key, it.Key())
|
|
|
|
val := it.Value()
|
|
|
|
var index dpaDBIndex
|
|
|
|
hash := key[1:]
|
|
decodeIndex(val, &index)
|
|
po := s.po(hash)
|
|
|
|
gci := &gcItem{
|
|
idxKey: key,
|
|
idx: index.Idx,
|
|
value: index.Access, // the smaller, the more likely to be gc'd. see sort comparator below.
|
|
po: po,
|
|
}
|
|
|
|
garbage = append(garbage, gci)
|
|
gcnt++
|
|
}
|
|
|
|
sort.Slice(garbage[:gcnt], func(i, j int) bool { return garbage[i].value < garbage[j].value })
|
|
|
|
cutoff := int(float32(gcnt) * ratio)
|
|
metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(cutoff))
|
|
|
|
for i := 0; i < cutoff; i++ {
|
|
s.delete(garbage[i].idx, garbage[i].idxKey, garbage[i].po)
|
|
}
|
|
}
|
|
|
|
// Export writes all chunks from the store to a tar archive, returning the
|
|
// number of chunks written.
|
|
func (s *LDBStore) Export(out io.Writer) (int64, error) {
|
|
tw := tar.NewWriter(out)
|
|
defer tw.Close()
|
|
|
|
it := s.db.NewIterator()
|
|
defer it.Release()
|
|
var count int64
|
|
for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
|
|
key := it.Key()
|
|
if (key == nil) || (key[0] != keyIndex) {
|
|
break
|
|
}
|
|
|
|
var index dpaDBIndex
|
|
|
|
hash := key[1:]
|
|
decodeIndex(it.Value(), &index)
|
|
po := s.po(hash)
|
|
datakey := getDataKey(index.Idx, po)
|
|
log.Trace("store.export", "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po)
|
|
data, err := s.db.Get(datakey)
|
|
if err != nil {
|
|
log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
|
|
continue
|
|
}
|
|
|
|
hdr := &tar.Header{
|
|
Name: hex.EncodeToString(hash),
|
|
Mode: 0644,
|
|
Size: int64(len(data)),
|
|
}
|
|
if err := tw.WriteHeader(hdr); err != nil {
|
|
return count, err
|
|
}
|
|
if _, err := tw.Write(data); err != nil {
|
|
return count, err
|
|
}
|
|
count++
|
|
}
|
|
|
|
return count, nil
|
|
}
|
|
|
|
// of chunks read.
|
|
func (s *LDBStore) Import(in io.Reader) (int64, error) {
|
|
tr := tar.NewReader(in)
|
|
|
|
var count int64
|
|
var wg sync.WaitGroup
|
|
for {
|
|
hdr, err := tr.Next()
|
|
if err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
return count, err
|
|
}
|
|
|
|
if len(hdr.Name) != 64 {
|
|
log.Warn("ignoring non-chunk file", "name", hdr.Name)
|
|
continue
|
|
}
|
|
|
|
keybytes, err := hex.DecodeString(hdr.Name)
|
|
if err != nil {
|
|
log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
|
|
continue
|
|
}
|
|
|
|
data, err := ioutil.ReadAll(tr)
|
|
if err != nil {
|
|
return count, err
|
|
}
|
|
key := Address(keybytes)
|
|
chunk := NewChunk(key, nil)
|
|
chunk.SData = data[32:]
|
|
s.Put(context.TODO(), chunk)
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
<-chunk.dbStoredC
|
|
}()
|
|
count++
|
|
}
|
|
wg.Wait()
|
|
return count, nil
|
|
}
|
|
|
|
func (s *LDBStore) Cleanup() {
|
|
//Iterates over the database and checks that there are no chunks bigger than 4kb
|
|
var errorsFound, removed, total int
|
|
|
|
it := s.db.NewIterator()
|
|
defer it.Release()
|
|
for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
|
|
key := it.Key()
|
|
if (key == nil) || (key[0] != keyIndex) {
|
|
break
|
|
}
|
|
total++
|
|
var index dpaDBIndex
|
|
err := decodeIndex(it.Value(), &index)
|
|
if err != nil {
|
|
log.Warn("Cannot decode")
|
|
errorsFound++
|
|
continue
|
|
}
|
|
hash := key[1:]
|
|
po := s.po(hash)
|
|
datakey := getDataKey(index.Idx, po)
|
|
data, err := s.db.Get(datakey)
|
|
if err != nil {
|
|
found := false
|
|
|
|
// highest possible proximity is 255
|
|
for po = 1; po <= 255; po++ {
|
|
datakey = getDataKey(index.Idx, po)
|
|
data, err = s.db.Get(datakey)
|
|
if err == nil {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key[:]))
|
|
errorsFound++
|
|
continue
|
|
}
|
|
}
|
|
|
|
c := &Chunk{}
|
|
ck := data[:32]
|
|
decodeData(data, c)
|
|
|
|
cs := int64(binary.LittleEndian.Uint64(c.SData[:8]))
|
|
log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs)
|
|
|
|
if len(c.SData) > chunk.DefaultSize+8 {
|
|
log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs)
|
|
s.delete(index.Idx, getIndexKey(key[1:]), po)
|
|
removed++
|
|
errorsFound++
|
|
}
|
|
}
|
|
|
|
log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
|
|
}
|
|
|
|
func (s *LDBStore) ReIndex() {
|
|
//Iterates over the database and checks that there are no faulty chunks
|
|
it := s.db.NewIterator()
|
|
startPosition := []byte{keyOldData}
|
|
it.Seek(startPosition)
|
|
var key []byte
|
|
var errorsFound, total int
|
|
for it.Valid() {
|
|
key = it.Key()
|
|
if (key == nil) || (key[0] != keyOldData) {
|
|
break
|
|
}
|
|
data := it.Value()
|
|
hasher := s.hashfunc()
|
|
hasher.Write(data)
|
|
hash := hasher.Sum(nil)
|
|
|
|
newKey := make([]byte, 10)
|
|
oldCntKey := make([]byte, 2)
|
|
newCntKey := make([]byte, 2)
|
|
oldCntKey[0] = keyDistanceCnt
|
|
newCntKey[0] = keyDistanceCnt
|
|
key[0] = keyData
|
|
key[1] = s.po(Address(key[1:]))
|
|
oldCntKey[1] = key[1]
|
|
newCntKey[1] = s.po(Address(newKey[1:]))
|
|
copy(newKey[2:], key[1:])
|
|
newValue := append(hash, data...)
|
|
|
|
batch := new(leveldb.Batch)
|
|
batch.Delete(key)
|
|
s.bucketCnt[oldCntKey[1]]--
|
|
batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]]))
|
|
batch.Put(newKey, newValue)
|
|
s.bucketCnt[newCntKey[1]]++
|
|
batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]]))
|
|
s.db.Write(batch)
|
|
it.Next()
|
|
}
|
|
it.Release()
|
|
log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
|
|
}
|
|
|
|
func (s *LDBStore) Delete(addr Address) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
ikey := getIndexKey(addr)
|
|
|
|
var indx dpaDBIndex
|
|
s.tryAccessIdx(ikey, &indx)
|
|
|
|
s.delete(indx.Idx, ikey, s.po(addr))
|
|
}
|
|
|
|
func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) {
|
|
metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1)
|
|
|
|
batch := new(leveldb.Batch)
|
|
batch.Delete(idxKey)
|
|
batch.Delete(getDataKey(idx, po))
|
|
s.entryCnt--
|
|
dbEntryCount.Dec(1)
|
|
s.bucketCnt[po]--
|
|
cntKey := make([]byte, 2)
|
|
cntKey[0] = keyDistanceCnt
|
|
cntKey[1] = po
|
|
batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
|
|
batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
|
|
s.db.Write(batch)
|
|
}
|
|
|
|
func (s *LDBStore) CurrentBucketStorageIndex(po uint8) uint64 {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
|
|
return s.bucketCnt[po]
|
|
}
|
|
|
|
func (s *LDBStore) Size() uint64 {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.entryCnt
|
|
}
|
|
|
|
func (s *LDBStore) CurrentStorageIndex() uint64 {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
return s.dataIdx
|
|
}
|
|
|
|
func (s *LDBStore) Put(ctx context.Context, chunk *Chunk) {
|
|
metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
|
|
log.Trace("ldbstore.put", "key", chunk.Addr)
|
|
|
|
ikey := getIndexKey(chunk.Addr)
|
|
var index dpaDBIndex
|
|
|
|
po := s.po(chunk.Addr)
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
log.Trace("ldbstore.put: s.db.Get", "key", chunk.Addr, "ikey", fmt.Sprintf("%x", ikey))
|
|
idata, err := s.db.Get(ikey)
|
|
if err != nil {
|
|
s.doPut(chunk, &index, po)
|
|
batchC := s.batchC
|
|
go func() {
|
|
<-batchC
|
|
chunk.markAsStored()
|
|
}()
|
|
} else {
|
|
log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Addr)
|
|
decodeIndex(idata, &index)
|
|
chunk.markAsStored()
|
|
}
|
|
index.Access = s.accessCnt
|
|
s.accessCnt++
|
|
idata = encodeIndex(&index)
|
|
s.batch.Put(ikey, idata)
|
|
select {
|
|
case s.batchesC <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// force putting into db, does not check access index
|
|
func (s *LDBStore) doPut(chunk *Chunk, index *dpaDBIndex, po uint8) {
|
|
data := s.encodeDataFunc(chunk)
|
|
dkey := getDataKey(s.dataIdx, po)
|
|
s.batch.Put(dkey, data)
|
|
index.Idx = s.dataIdx
|
|
s.bucketCnt[po] = s.dataIdx
|
|
s.entryCnt++
|
|
dbEntryCount.Inc(1)
|
|
s.dataIdx++
|
|
|
|
cntKey := make([]byte, 2)
|
|
cntKey[0] = keyDistanceCnt
|
|
cntKey[1] = po
|
|
s.batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
|
|
}
|
|
|
|
func (s *LDBStore) writeBatches() {
|
|
mainLoop:
|
|
for {
|
|
select {
|
|
case <-s.quit:
|
|
break mainLoop
|
|
case <-s.batchesC:
|
|
s.lock.Lock()
|
|
b := s.batch
|
|
e := s.entryCnt
|
|
d := s.dataIdx
|
|
a := s.accessCnt
|
|
c := s.batchC
|
|
s.batchC = make(chan bool)
|
|
s.batch = new(leveldb.Batch)
|
|
err := s.writeBatch(b, e, d, a)
|
|
// TODO: set this error on the batch, then tell the chunk
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("spawn batch write (%d entries): %v", b.Len(), err))
|
|
}
|
|
close(c)
|
|
for e > s.capacity {
|
|
log.Trace("for >", "e", e, "s.capacity", s.capacity)
|
|
// Collect garbage in a separate goroutine
|
|
// to be able to interrupt this loop by s.quit.
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.collectGarbage(gcArrayFreeRatio)
|
|
log.Trace("collectGarbage closing done")
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-s.quit:
|
|
s.lock.Unlock()
|
|
break mainLoop
|
|
case <-done:
|
|
}
|
|
e = s.entryCnt
|
|
}
|
|
s.lock.Unlock()
|
|
}
|
|
}
|
|
log.Trace(fmt.Sprintf("DbStore: quit batch write loop"))
|
|
}
|
|
|
|
// must be called non concurrently
|
|
func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uint64) error {
|
|
b.Put(keyEntryCnt, U64ToBytes(entryCnt))
|
|
b.Put(keyDataIdx, U64ToBytes(dataIdx))
|
|
b.Put(keyAccessCnt, U64ToBytes(accessCnt))
|
|
l := b.Len()
|
|
if err := s.db.Write(b); err != nil {
|
|
return fmt.Errorf("unable to write batch: %v", err)
|
|
}
|
|
log.Trace(fmt.Sprintf("batch write (%d entries)", l))
|
|
return nil
|
|
}
|
|
|
|
// newMockEncodeDataFunc returns a function that stores the chunk data
|
|
// to a mock store to bypass the default functionality encodeData.
|
|
// The constructed function always returns the nil data, as DbStore does
|
|
// not need to store the data, but still need to create the index.
|
|
func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk *Chunk) []byte {
|
|
return func(chunk *Chunk) []byte {
|
|
if err := mockStore.Put(chunk.Addr, encodeData(chunk)); err != nil {
|
|
log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Addr.Log(), err))
|
|
}
|
|
return chunk.Addr[:]
|
|
}
|
|
}
|
|
|
|
// try to find index; if found, update access cnt and return true
|
|
func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
|
|
idata, err := s.db.Get(ikey)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
decodeIndex(idata, index)
|
|
s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
|
|
s.accessCnt++
|
|
index.Access = s.accessCnt
|
|
idata = encodeIndex(index)
|
|
s.batch.Put(ikey, idata)
|
|
select {
|
|
case s.batchesC <- struct{}{}:
|
|
default:
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s *LDBStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
|
|
metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
|
|
log.Trace("ldbstore.get", "key", addr)
|
|
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
return s.get(addr)
|
|
}
|
|
|
|
func (s *LDBStore) get(addr Address) (chunk *Chunk, err error) {
|
|
var indx dpaDBIndex
|
|
|
|
if s.tryAccessIdx(getIndexKey(addr), &indx) {
|
|
var data []byte
|
|
if s.getDataFunc != nil {
|
|
// if getDataFunc is defined, use it to retrieve the chunk data
|
|
log.Trace("ldbstore.get retrieve with getDataFunc", "key", addr)
|
|
data, err = s.getDataFunc(addr)
|
|
if err != nil {
|
|
return
|
|
}
|
|
} else {
|
|
// default DbStore functionality to retrieve chunk data
|
|
proximity := s.po(addr)
|
|
datakey := getDataKey(indx.Idx, proximity)
|
|
data, err = s.db.Get(datakey)
|
|
log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
|
|
if err != nil {
|
|
log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
|
|
s.delete(indx.Idx, getIndexKey(addr), s.po(addr))
|
|
return
|
|
}
|
|
}
|
|
|
|
chunk = NewChunk(addr, nil)
|
|
chunk.markAsStored()
|
|
decodeData(data, chunk)
|
|
} else {
|
|
err = ErrChunkNotFound
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// newMockGetFunc returns a function that reads chunk data from
|
|
// the mock database, which is used as the value for DbStore.getFunc
|
|
// to bypass the default functionality of DbStore with a mock store.
|
|
func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []byte, err error) {
|
|
return func(addr Address) (data []byte, err error) {
|
|
data, err = mockStore.Get(addr)
|
|
if err == mock.ErrNotFound {
|
|
// preserve ErrChunkNotFound error
|
|
err = ErrChunkNotFound
|
|
}
|
|
return data, err
|
|
}
|
|
}
|
|
|
|
func (s *LDBStore) updateAccessCnt(addr Address) {
|
|
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
var index dpaDBIndex
|
|
s.tryAccessIdx(getIndexKey(addr), &index) // result_chn == nil, only update access cnt
|
|
|
|
}
|
|
|
|
func (s *LDBStore) setCapacity(c uint64) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.capacity = c
|
|
|
|
if s.entryCnt > c {
|
|
ratio := float32(1.01) - float32(c)/float32(s.entryCnt)
|
|
if ratio < gcArrayFreeRatio {
|
|
ratio = gcArrayFreeRatio
|
|
}
|
|
if ratio > 1 {
|
|
ratio = 1
|
|
}
|
|
for s.entryCnt > c {
|
|
s.collectGarbage(ratio)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *LDBStore) Close() {
|
|
close(s.quit)
|
|
s.db.Close()
|
|
}
|
|
|
|
// SyncIterator(start, stop, po, f) calls f on each hash of a bin po from start to stop
|
|
func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Address, uint64) bool) error {
|
|
metrics.GetOrRegisterCounter("ldbstore.synciterator", nil).Inc(1)
|
|
|
|
sincekey := getDataKey(since, po)
|
|
untilkey := getDataKey(until, po)
|
|
it := s.db.NewIterator()
|
|
defer it.Release()
|
|
|
|
for ok := it.Seek(sincekey); ok; ok = it.Next() {
|
|
metrics.GetOrRegisterCounter("ldbstore.synciterator.seek", nil).Inc(1)
|
|
|
|
dbkey := it.Key()
|
|
if dbkey[0] != keyData || dbkey[1] != po || bytes.Compare(untilkey, dbkey) < 0 {
|
|
break
|
|
}
|
|
key := make([]byte, 32)
|
|
val := it.Value()
|
|
copy(key, val[:32])
|
|
if !f(Address(key), binary.BigEndian.Uint64(dbkey[2:])) {
|
|
break
|
|
}
|
|
}
|
|
return it.Error()
|
|
}
|
|
|
|
func databaseExists(path string) bool {
|
|
o := &opt.Options{
|
|
ErrorIfMissing: true,
|
|
}
|
|
tdb, err := leveldb.OpenFile(path, o)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer tdb.Close()
|
|
return true
|
|
}
|