support legacy keying: base32 multihashes *sigh*.
This commit is contained in:
parent
f4e13ffd80
commit
7facdf63c9
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/dgraph-io/badger/v2"
|
"github.com/dgraph-io/badger/v2"
|
||||||
"github.com/dgraph-io/badger/v2/options"
|
"github.com/dgraph-io/badger/v2/options"
|
||||||
|
"github.com/multiformats/go-base32"
|
||||||
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
blocks "github.com/ipfs/go-block-format"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -17,6 +18,11 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// KeyPool is the buffer pool we use to compute storage keys.
|
||||||
|
KeyPool *pool.BufferPool = pool.GlobalPool
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ErrBlockstoreClosed is returned from blockstore operations after
|
// ErrBlockstoreClosed is returned from blockstore operations after
|
||||||
// the blockstore has been closed.
|
// the blockstore has been closed.
|
||||||
@ -121,9 +127,9 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
|
|||||||
return ErrBlockstoreClosed
|
return ErrBlockstoreClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
k, pooled := b.PooledPrefixedKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer pool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.DB.View(func(txn *badger.Txn) error {
|
return b.DB.View(func(txn *badger.Txn) error {
|
||||||
@ -143,9 +149,9 @@ func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
|
|||||||
return false, ErrBlockstoreClosed
|
return false, ErrBlockstoreClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
k, pooled := b.PooledPrefixedKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer pool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := b.DB.View(func(txn *badger.Txn) error {
|
err := b.DB.View(func(txn *badger.Txn) error {
|
||||||
@ -172,9 +178,9 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
|
|||||||
return nil, ErrBlockstoreClosed
|
return nil, ErrBlockstoreClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
k, pooled := b.PooledPrefixedKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer pool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
var val []byte
|
var val []byte
|
||||||
@ -200,9 +206,9 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
|
|||||||
return -1, ErrBlockstoreClosed
|
return -1, ErrBlockstoreClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
k, pooled := b.PooledPrefixedKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer pool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
var size int
|
var size int
|
||||||
@ -228,9 +234,9 @@ func (b *Blockstore) Put(block blocks.Block) error {
|
|||||||
return ErrBlockstoreClosed
|
return ErrBlockstoreClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
k, pooled := b.PooledPrefixedKey(block.Cid())
|
k, pooled := b.PooledStorageKey(block.Cid())
|
||||||
if pooled {
|
if pooled {
|
||||||
defer pool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := b.DB.Update(func(txn *badger.Txn) error {
|
err := b.DB.Update(func(txn *badger.Txn) error {
|
||||||
@ -258,13 +264,13 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
|
|||||||
toReturn = make([][]byte, 0, len(blocks))
|
toReturn = make([][]byte, 0, len(blocks))
|
||||||
defer func() {
|
defer func() {
|
||||||
for _, b := range toReturn {
|
for _, b := range toReturn {
|
||||||
pool.Put(b)
|
KeyPool.Put(b)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, block := range blocks {
|
for _, block := range blocks {
|
||||||
k, pooled := b.PooledPrefixedKey(block.Cid())
|
k, pooled := b.PooledStorageKey(block.Cid())
|
||||||
if pooled {
|
if pooled {
|
||||||
toReturn = append(toReturn, k)
|
toReturn = append(toReturn, k)
|
||||||
}
|
}
|
||||||
@ -285,9 +291,9 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
|
|||||||
return ErrBlockstoreClosed
|
return ErrBlockstoreClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
k, pooled := b.PooledPrefixedKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer pool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.DB.Update(func(txn *badger.Txn) error {
|
return b.DB.Update(func(txn *badger.Txn) error {
|
||||||
@ -312,6 +318,9 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
|||||||
defer close(ch)
|
defer close(ch)
|
||||||
defer iter.Close()
|
defer iter.Close()
|
||||||
|
|
||||||
|
// NewCidV1 makes a copy of the multihash buffer, so we can reuse it to
|
||||||
|
// contain allocs.
|
||||||
|
var buf []byte
|
||||||
for iter.Rewind(); iter.Valid(); iter.Next() {
|
for iter.Rewind(); iter.Valid(); iter.Next() {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return // context has fired.
|
return // context has fired.
|
||||||
@ -324,37 +333,43 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
|||||||
if b.prefixing {
|
if b.prefixing {
|
||||||
k = k[b.prefixLen:]
|
k = k[b.prefixLen:]
|
||||||
}
|
}
|
||||||
ch <- cid.NewCidV1(cid.Raw, k)
|
|
||||||
|
if reqlen := base32.RawStdEncoding.DecodedLen(len(k)); len(buf) < reqlen {
|
||||||
|
buf = make([]byte, reqlen)
|
||||||
|
}
|
||||||
|
if n, err := base32.RawStdEncoding.Decode(buf, k); err == nil {
|
||||||
|
ch <- cid.NewCidV1(cid.Raw, buf[:n])
|
||||||
|
} else {
|
||||||
|
log.Warnf("failed to decode key %s in badger AllKeysChan; err: %s", k, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return ch, nil
|
return ch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blockstore) HashOnRead(enabled bool) {
|
func (b *Blockstore) HashOnRead(_ bool) {
|
||||||
log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring")
|
log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blockstore) PrefixedKey(cid cid.Cid) []byte {
|
// PooledStorageKey returns the storage key under which this CID is stored.
|
||||||
|
//
|
||||||
|
// The key is: prefix + base32_no_padding(cid.Hash)
|
||||||
|
//
|
||||||
|
// This method may return pooled byte slice, which MUST be returned to the
|
||||||
|
// KeyPool if pooled=true, or a leak will occur.
|
||||||
|
func (b *Blockstore) PooledStorageKey(cid cid.Cid) (key []byte, pooled bool) {
|
||||||
h := cid.Hash()
|
h := cid.Hash()
|
||||||
|
size := base32.RawStdEncoding.EncodedLen(len(h))
|
||||||
if !b.prefixing {
|
if !b.prefixing {
|
||||||
return h
|
k := pool.Get(size)
|
||||||
}
|
base32.RawStdEncoding.Encode(k, h)
|
||||||
k := make([]byte, b.prefixLen+len(h))
|
return k, true // slicing upto length unnecessary; the pool has already done this.
|
||||||
copy(k, b.prefix)
|
|
||||||
copy(k[b.prefixLen:], h)
|
|
||||||
return k
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blockstore) PooledPrefixedKey(cid cid.Cid) (key []byte, pooled bool) {
|
size += b.prefixLen
|
||||||
h := cid.Hash()
|
|
||||||
if !b.prefixing {
|
|
||||||
return h, false
|
|
||||||
}
|
|
||||||
|
|
||||||
size := b.prefixLen + len(h)
|
|
||||||
k := pool.Get(size)
|
k := pool.Get(size)
|
||||||
copy(k, b.prefix)
|
copy(k, b.prefix)
|
||||||
copy(k[b.prefixLen:], h)
|
base32.RawStdEncoding.Encode(k[b.prefixLen:], h)
|
||||||
return k, true
|
return k, true // slicing upto length unnecessary; the pool has already done this.
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user