lotus/blockstore/buffered.go

175 lines
3.2 KiB
Go
Raw Permalink Normal View History

package blockstore
2019-07-05 14:29:17 +00:00
import (
"context"
"os"
2019-07-05 14:29:17 +00:00
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)
// buflog is a logger for the buffered blockstore. It is subscoped from the
// blockstore logger.
var buflog = log.Named("buf")
type BufferedBlockstore struct {
read Blockstore
write Blockstore
2019-07-05 14:29:17 +00:00
}
func NewBuffered(base Blockstore) *BufferedBlockstore {
var buf Blockstore
if os.Getenv("LOTUS_DISABLE_VM_BUF") == "iknowitsabadidea" {
buflog.Warn("VM BLOCKSTORE BUFFERING IS DISABLED")
buf = base
} else {
buf = NewMemory()
}
bs := &BufferedBlockstore{
2019-07-05 14:29:17 +00:00
read: base,
write: buf,
}
return bs
2019-07-05 14:29:17 +00:00
}
func NewTieredBstore(r Blockstore, w Blockstore) *BufferedBlockstore {
return &BufferedBlockstore{
read: r,
write: w,
}
}
2021-01-29 23:17:25 +00:00
var (
_ Blockstore = (*BufferedBlockstore)(nil)
_ Viewer = (*BufferedBlockstore)(nil)
)
2019-07-05 14:29:17 +00:00
func (bs *BufferedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
2019-07-05 14:29:17 +00:00
a, err := bs.read.AllKeysChan(ctx)
if err != nil {
return nil, err
}
b, err := bs.write.AllKeysChan(ctx)
if err != nil {
return nil, err
}
out := make(chan cid.Cid)
go func() {
defer close(out)
for a != nil || b != nil {
select {
case val, ok := <-a:
if !ok {
a = nil
} else {
select {
case out <- val:
case <-ctx.Done():
return
}
}
case val, ok := <-b:
if !ok {
b = nil
} else {
select {
case out <- val:
case <-ctx.Done():
return
}
}
}
}
}()
return out, nil
}
func (bs *BufferedBlockstore) DeleteBlock(c cid.Cid) error {
2019-07-05 14:29:17 +00:00
if err := bs.read.DeleteBlock(c); err != nil {
return err
}
return bs.write.DeleteBlock(c)
}
2021-03-02 14:45:45 +00:00
func (bs *BufferedBlockstore) DeleteMany(cids []cid.Cid) error {
if err := bs.read.DeleteMany(cids); err != nil {
return err
}
return bs.write.DeleteMany(cids)
}
func (bs *BufferedBlockstore) View(c cid.Cid, callback func([]byte) error) error {
// both stores are viewable.
2021-02-28 22:20:29 +00:00
if err := bs.write.View(c, callback); err == ErrNotFound {
// not found in write blockstore; fall through.
} else {
return err // propagate errors, or nil, i.e. found.
}
2021-02-28 22:20:29 +00:00
return bs.read.View(c, callback)
}
func (bs *BufferedBlockstore) Get(c cid.Cid) (block.Block, error) {
2020-11-03 22:02:01 +00:00
if out, err := bs.write.Get(c); err != nil {
if err != ErrNotFound {
2019-07-05 14:29:17 +00:00
return nil, err
}
} else {
return out, nil
}
2020-11-03 22:02:01 +00:00
return bs.read.Get(c)
2019-07-05 14:29:17 +00:00
}
func (bs *BufferedBlockstore) GetSize(c cid.Cid) (int, error) {
2020-07-22 19:26:57 +00:00
s, err := bs.read.GetSize(c)
if err == ErrNotFound || s == 0 {
2020-07-22 19:26:57 +00:00
return bs.write.GetSize(c)
}
2020-07-23 08:23:44 +00:00
return s, err
2019-07-05 14:29:17 +00:00
}
func (bs *BufferedBlockstore) Put(blk block.Block) error {
2020-11-03 22:02:01 +00:00
has, err := bs.read.Has(blk.Cid()) // TODO: consider dropping this check
if err != nil {
return err
}
if has {
return nil
}
2019-07-05 14:29:17 +00:00
return bs.write.Put(blk)
}
func (bs *BufferedBlockstore) Has(c cid.Cid) (bool, error) {
2020-11-03 22:02:01 +00:00
has, err := bs.write.Has(c)
2019-07-05 14:29:17 +00:00
if err != nil {
return false, err
}
if has {
return true, nil
}
2020-11-03 22:02:01 +00:00
return bs.read.Has(c)
2019-07-05 14:29:17 +00:00
}
func (bs *BufferedBlockstore) HashOnRead(hor bool) {
2019-07-05 14:29:17 +00:00
bs.read.HashOnRead(hor)
bs.write.HashOnRead(hor)
}
func (bs *BufferedBlockstore) PutMany(blks []block.Block) error {
2019-07-05 14:29:17 +00:00
return bs.write.PutMany(blks)
2019-07-05 14:36:08 +00:00
}
func (bs *BufferedBlockstore) Read() Blockstore {
return bs.read
}