diff --git a/blockstore/autobatch.go b/blockstore/autobatch.go new file mode 100644 index 000000000..308b98b4b --- /dev/null +++ b/blockstore/autobatch.go @@ -0,0 +1,93 @@ +package blockstore + +import ( + "context" + "sync" + + block "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" +) + +// autolog is a logger for the autobatching blockstore. It is subscoped from the +// blockstore logger. +var autolog = log.Named("auto") + +type AutobatchBlockstore struct { + bufferedBlks []block.Block + bufferedBlksLk sync.Mutex + flushLk sync.Mutex + backingBs Blockstore + bufferCapacity int + bufferSize int +} + +func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error { + panic("implement me") +} + +func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + panic("implement me") +} + +func (bs *AutobatchBlockstore) GetSize(context.Context, cid.Cid) (int, error) { + panic("implement me") +} + +func (bs *AutobatchBlockstore) PutMany(context.Context, []block.Block) error { + panic("implement me") +} + +func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + panic("implement me") +} + +func (bs *AutobatchBlockstore) HashOnRead(enabled bool) { + panic("implement me") +} + +func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { + panic("implement me") +} + +func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { + panic("implement me") +} + +func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore { + bs := &AutobatchBlockstore{ + backingBs: backingBs, + bufferCapacity: bufferCapacity, + } + + return bs +} + +// May NOT `Get` blocks that have been `Put` into this store +// Only guaranteed to fetch those that were already in the backingBs at creation of this store and those at the most recent `Flush` +func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) { + return bs.backingBs.Get(ctx, c) +} + +func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error { + // TODO: Would it be faster to check if bs.backing Has the blk (and skip if so)? + bs.bufferedBlksLk.Lock() + bs.bufferedBlks = append(bs.bufferedBlks, blk) + bs.bufferSize += len(blk.RawData()) + if bs.bufferSize >= bs.bufferCapacity { + // time to flush + go bs.Flush(ctx) + } + bs.bufferedBlksLk.Unlock() + return nil +} + +func (bs *AutobatchBlockstore) Flush(ctx context.Context) { + bs.flushLk.Lock() + defer bs.flushLk.Unlock() + bs.bufferedBlksLk.Lock() + toFlush := bs.bufferedBlks + bs.bufferedBlks = []block.Block{} + bs.bufferedBlksLk.Unlock() + // error????? + bs.backingBs.PutMany(ctx, toFlush) +} diff --git a/blockstore/autobatch_test.go b/blockstore/autobatch_test.go new file mode 100644 index 000000000..fe52c55c7 --- /dev/null +++ b/blockstore/autobatch_test.go @@ -0,0 +1,33 @@ +package blockstore + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAutobatchBlockstore(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ab := NewAutobatch(ctx, NewMemory(), len(b0.RawData())+len(b1.RawData())-1) + + require.NoError(t, ab.Put(ctx, b0)) + require.NoError(t, ab.Put(ctx, b1)) + require.NoError(t, ab.Put(ctx, b2)) + + ab.Flush(ctx) + + v0, err := ab.Get(ctx, b0.Cid()) + require.NoError(t, err) + require.Equal(t, b0.RawData(), v0.RawData()) + + v1, err := ab.Get(ctx, b1.Cid()) + require.NoError(t, err) + require.Equal(t, b1.RawData(), v1.RawData()) + + v2, err := ab.Get(ctx, b2.Cid()) + require.NoError(t, err) + require.Equal(t, b2.RawData(), v2.RawData()) +} diff --git a/chain/consensus/filcns/upgrades.go b/chain/consensus/filcns/upgrades.go index aae226019..4e65653ad 100644 --- a/chain/consensus/filcns/upgrades.go +++ b/chain/consensus/filcns/upgrades.go @@ -5,7 +5,7 @@ import ( "runtime" "time" - autobatch "github.com/application-research/go-bs-autobatch" + "github.com/docker/go-units" "github.com/filecoin-project/specs-actors/v6/actors/migration/nv14" "github.com/filecoin-project/specs-actors/v7/actors/migration/nv15" @@ -1264,14 +1264,14 @@ func upgradeActorsV7Common( root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet, config nv15.Config, ) (cid.Cid, error) { - writeStore, err := autobatch.NewBlockstore(sm.ChainStore().StateBlockstore(), blockstore.NewMemorySync(), 100_000, 100, true) - if err != nil { - return cid.Undef, xerrors.Errorf("failed to create writeStore: %w", err) - } - buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore) - store := store.ActorStore(ctx, buf) + ctxWithCancel, cancel := context.WithCancel(ctx) + defer cancel() + writeStore := blockstore.NewAutobatch(ctxWithCancel, sm.ChainStore().StateBlockstore(), units.GiB) + // TODO: pretty sure we'd achieve nothing by doing this, confirm in review + //buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore) + store := store.ActorStore(ctx, writeStore) // Load the state root. var stateRoot types.StateRoot if err := store.Get(ctx, root, &stateRoot); err != nil { @@ -1303,14 +1303,7 @@ func upgradeActorsV7Common( // Persist the new tree. - { - from := buf - to := buf.Read() - - if err := vm.Copy(ctx, from, to, newRoot); err != nil { - return cid.Undef, xerrors.Errorf("copying migrated tree: %w", err) - } - } + writeStore.Flush(ctx) return newRoot, nil } diff --git a/go.mod b/go.mod index 62de058e8..4b8f942a6 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921 - github.com/application-research/go-bs-autobatch v0.0.0-20211215020302-c4c0b68ef402 github.com/buger/goterm v1.0.3 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/cockroachdb/pebble v0.0.0-20201001221639-879f3bfeef07 diff --git a/go.sum b/go.sum index 058c5957f..c085ec5ff 100644 --- a/go.sum +++ b/go.sum @@ -100,8 +100,6 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= -github.com/application-research/go-bs-autobatch v0.0.0-20211215020302-c4c0b68ef402 h1:8l0IQrh8vwqihv5jNhKCYB+YGH5hGGFL7od/2ETWrZw= -github.com/application-research/go-bs-autobatch v0.0.0-20211215020302-c4c0b68ef402/go.mod h1:86iZMWoyMLfLpYyd0aMPyECUpFwf8oZRjS5jJ9quZ7I= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=