Implement an autobatcher

This commit is contained in:
Aayush Rajasekaran 2022-01-11 11:31:59 -05:00 committed by Jennifer Wang
parent def5cb559e
commit da39b16c83
5 changed files with 134 additions and 18 deletions

93
blockstore/autobatch.go Normal file
View File

@ -0,0 +1,93 @@
package blockstore
import (
"context"
"sync"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)
// autolog is a logger for the autobatching blockstore. It is subscoped from the
// blockstore logger.
var autolog = log.Named("auto")
type AutobatchBlockstore struct {
bufferedBlks []block.Block
bufferedBlksLk sync.Mutex
flushLk sync.Mutex
backingBs Blockstore
bufferCapacity int
bufferSize int
}
func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error {
panic("implement me")
}
func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
panic("implement me")
}
func (bs *AutobatchBlockstore) GetSize(context.Context, cid.Cid) (int, error) {
panic("implement me")
}
func (bs *AutobatchBlockstore) PutMany(context.Context, []block.Block) error {
panic("implement me")
}
func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
panic("implement me")
}
func (bs *AutobatchBlockstore) HashOnRead(enabled bool) {
panic("implement me")
}
func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
panic("implement me")
}
func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
panic("implement me")
}
func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore {
bs := &AutobatchBlockstore{
backingBs: backingBs,
bufferCapacity: bufferCapacity,
}
return bs
}
// May NOT `Get` blocks that have been `Put` into this store
// Only guaranteed to fetch those that were already in the backingBs at creation of this store and those at the most recent `Flush`
func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
return bs.backingBs.Get(ctx, c)
}
func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error {
// TODO: Would it be faster to check if bs.backing Has the blk (and skip if so)?
bs.bufferedBlksLk.Lock()
bs.bufferedBlks = append(bs.bufferedBlks, blk)
bs.bufferSize += len(blk.RawData())
if bs.bufferSize >= bs.bufferCapacity {
// time to flush
go bs.Flush(ctx)
}
bs.bufferedBlksLk.Unlock()
return nil
}
func (bs *AutobatchBlockstore) Flush(ctx context.Context) {
bs.flushLk.Lock()
defer bs.flushLk.Unlock()
bs.bufferedBlksLk.Lock()
toFlush := bs.bufferedBlks
bs.bufferedBlks = []block.Block{}
bs.bufferedBlksLk.Unlock()
// error?????
bs.backingBs.PutMany(ctx, toFlush)
}

View File

@ -0,0 +1,33 @@
package blockstore
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestAutobatchBlockstore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ab := NewAutobatch(ctx, NewMemory(), len(b0.RawData())+len(b1.RawData())-1)
require.NoError(t, ab.Put(ctx, b0))
require.NoError(t, ab.Put(ctx, b1))
require.NoError(t, ab.Put(ctx, b2))
ab.Flush(ctx)
v0, err := ab.Get(ctx, b0.Cid())
require.NoError(t, err)
require.Equal(t, b0.RawData(), v0.RawData())
v1, err := ab.Get(ctx, b1.Cid())
require.NoError(t, err)
require.Equal(t, b1.RawData(), v1.RawData())
v2, err := ab.Get(ctx, b2.Cid())
require.NoError(t, err)
require.Equal(t, b2.RawData(), v2.RawData())
}

View File

@ -5,7 +5,7 @@ import (
"runtime"
"time"
autobatch "github.com/application-research/go-bs-autobatch"
"github.com/docker/go-units"
"github.com/filecoin-project/specs-actors/v6/actors/migration/nv14"
"github.com/filecoin-project/specs-actors/v7/actors/migration/nv15"
@ -1264,14 +1264,14 @@ func upgradeActorsV7Common(
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet,
config nv15.Config,
) (cid.Cid, error) {
writeStore, err := autobatch.NewBlockstore(sm.ChainStore().StateBlockstore(), blockstore.NewMemorySync(), 100_000, 100, true)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create writeStore: %w", err)
}
buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore)
store := store.ActorStore(ctx, buf)
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
writeStore := blockstore.NewAutobatch(ctxWithCancel, sm.ChainStore().StateBlockstore(), units.GiB)
// TODO: pretty sure we'd achieve nothing by doing this, confirm in review
//buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore)
store := store.ActorStore(ctx, writeStore)
// Load the state root.
var stateRoot types.StateRoot
if err := store.Get(ctx, root, &stateRoot); err != nil {
@ -1303,14 +1303,7 @@ func upgradeActorsV7Common(
// Persist the new tree.
{
from := buf
to := buf.Read()
if err := vm.Copy(ctx, from, to, newRoot); err != nil {
return cid.Undef, xerrors.Errorf("copying migrated tree: %w", err)
}
}
writeStore.Flush(ctx)
return newRoot, nil
}

1
go.mod
View File

@ -12,7 +12,6 @@ require (
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921
github.com/application-research/go-bs-autobatch v0.0.0-20211215020302-c4c0b68ef402
github.com/buger/goterm v1.0.3
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327

2
go.sum
View File

@ -100,8 +100,6 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/application-research/go-bs-autobatch v0.0.0-20211215020302-c4c0b68ef402 h1:8l0IQrh8vwqihv5jNhKCYB+YGH5hGGFL7od/2ETWrZw=
github.com/application-research/go-bs-autobatch v0.0.0-20211215020302-c4c0b68ef402/go.mod h1:86iZMWoyMLfLpYyd0aMPyECUpFwf8oZRjS5jJ9quZ7I=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=