Implement an autobatcher
This commit is contained in:
parent
19bd9cf945
commit
25768a291e
93
blockstore/autobatch.go
Normal file
93
blockstore/autobatch.go
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
package blockstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
block "github.com/ipfs/go-block-format"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
)
|
||||||
|
|
||||||
|
// autolog is a logger for the autobatching blockstore. It is subscoped from the
|
||||||
|
// blockstore logger.
|
||||||
|
var autolog = log.Named("auto")
|
||||||
|
|
||||||
|
type AutobatchBlockstore struct {
|
||||||
|
bufferedBlks []block.Block
|
||||||
|
bufferedBlksLk sync.Mutex
|
||||||
|
flushLk sync.Mutex
|
||||||
|
backingBs Blockstore
|
||||||
|
bufferCapacity int
|
||||||
|
bufferSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) GetSize(context.Context, cid.Cid) (int, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) PutMany(context.Context, []block.Block) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) HashOnRead(enabled bool) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore {
|
||||||
|
bs := &AutobatchBlockstore{
|
||||||
|
backingBs: backingBs,
|
||||||
|
bufferCapacity: bufferCapacity,
|
||||||
|
}
|
||||||
|
|
||||||
|
return bs
|
||||||
|
}
|
||||||
|
|
||||||
|
// May NOT `Get` blocks that have been `Put` into this store
|
||||||
|
// Only guaranteed to fetch those that were already in the backingBs at creation of this store and those at the most recent `Flush`
|
||||||
|
func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
|
||||||
|
return bs.backingBs.Get(ctx, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error {
|
||||||
|
// TODO: Would it be faster to check if bs.backing Has the blk (and skip if so)?
|
||||||
|
bs.bufferedBlksLk.Lock()
|
||||||
|
bs.bufferedBlks = append(bs.bufferedBlks, blk)
|
||||||
|
bs.bufferSize += len(blk.RawData())
|
||||||
|
if bs.bufferSize >= bs.bufferCapacity {
|
||||||
|
// time to flush
|
||||||
|
go bs.Flush(ctx)
|
||||||
|
}
|
||||||
|
bs.bufferedBlksLk.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *AutobatchBlockstore) Flush(ctx context.Context) {
|
||||||
|
bs.flushLk.Lock()
|
||||||
|
defer bs.flushLk.Unlock()
|
||||||
|
bs.bufferedBlksLk.Lock()
|
||||||
|
toFlush := bs.bufferedBlks
|
||||||
|
bs.bufferedBlks = []block.Block{}
|
||||||
|
bs.bufferedBlksLk.Unlock()
|
||||||
|
// error?????
|
||||||
|
bs.backingBs.PutMany(ctx, toFlush)
|
||||||
|
}
|
33
blockstore/autobatch_test.go
Normal file
33
blockstore/autobatch_test.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package blockstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAutobatchBlockstore(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
ab := NewAutobatch(ctx, NewMemory(), len(b0.RawData())+len(b1.RawData())-1)
|
||||||
|
|
||||||
|
require.NoError(t, ab.Put(ctx, b0))
|
||||||
|
require.NoError(t, ab.Put(ctx, b1))
|
||||||
|
require.NoError(t, ab.Put(ctx, b2))
|
||||||
|
|
||||||
|
ab.Flush(ctx)
|
||||||
|
|
||||||
|
v0, err := ab.Get(ctx, b0.Cid())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, b0.RawData(), v0.RawData())
|
||||||
|
|
||||||
|
v1, err := ab.Get(ctx, b1.Cid())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, b1.RawData(), v1.RawData())
|
||||||
|
|
||||||
|
v2, err := ab.Get(ctx, b2.Cid())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, b2.RawData(), v2.RawData())
|
||||||
|
}
|
@ -5,7 +5,7 @@ import (
|
|||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
autobatch "github.com/application-research/go-bs-autobatch"
|
"github.com/docker/go-units"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/v6/actors/migration/nv14"
|
"github.com/filecoin-project/specs-actors/v6/actors/migration/nv14"
|
||||||
"github.com/filecoin-project/specs-actors/v7/actors/migration/nv15"
|
"github.com/filecoin-project/specs-actors/v7/actors/migration/nv15"
|
||||||
@ -1264,14 +1264,14 @@ func upgradeActorsV7Common(
|
|||||||
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet,
|
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet,
|
||||||
config nv15.Config,
|
config nv15.Config,
|
||||||
) (cid.Cid, error) {
|
) (cid.Cid, error) {
|
||||||
writeStore, err := autobatch.NewBlockstore(sm.ChainStore().StateBlockstore(), blockstore.NewMemorySync(), 100_000, 100, true)
|
|
||||||
if err != nil {
|
|
||||||
return cid.Undef, xerrors.Errorf("failed to create writeStore: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore)
|
ctxWithCancel, cancel := context.WithCancel(ctx)
|
||||||
store := store.ActorStore(ctx, buf)
|
defer cancel()
|
||||||
|
|
||||||
|
writeStore := blockstore.NewAutobatch(ctxWithCancel, sm.ChainStore().StateBlockstore(), units.GiB)
|
||||||
|
// TODO: pretty sure we'd achieve nothing by doing this, confirm in review
|
||||||
|
//buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore)
|
||||||
|
store := store.ActorStore(ctx, writeStore)
|
||||||
// Load the state root.
|
// Load the state root.
|
||||||
var stateRoot types.StateRoot
|
var stateRoot types.StateRoot
|
||||||
if err := store.Get(ctx, root, &stateRoot); err != nil {
|
if err := store.Get(ctx, root, &stateRoot); err != nil {
|
||||||
@ -1303,14 +1303,7 @@ func upgradeActorsV7Common(
|
|||||||
|
|
||||||
// Persist the new tree.
|
// Persist the new tree.
|
||||||
|
|
||||||
{
|
writeStore.Flush(ctx)
|
||||||
from := buf
|
|
||||||
to := buf.Read()
|
|
||||||
|
|
||||||
if err := vm.Copy(ctx, from, to, newRoot); err != nil {
|
|
||||||
return cid.Undef, xerrors.Errorf("copying migrated tree: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return newRoot, nil
|
return newRoot, nil
|
||||||
}
|
}
|
||||||
|
1
go.mod
1
go.mod
@ -11,7 +11,6 @@ require (
|
|||||||
github.com/StackExchange/wmi v1.2.1 // indirect
|
github.com/StackExchange/wmi v1.2.1 // indirect
|
||||||
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
|
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
|
||||||
github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921
|
github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921
|
||||||
github.com/application-research/go-bs-autobatch v0.0.0-20211215020302-c4c0b68ef402
|
|
||||||
github.com/buger/goterm v1.0.3
|
github.com/buger/goterm v1.0.3
|
||||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
|
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
|
||||||
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327
|
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327
|
||||||
|
2
go.sum
2
go.sum
@ -98,8 +98,6 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU
|
|||||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||||
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||||
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||||
github.com/application-research/go-bs-autobatch v0.0.0-20211215020302-c4c0b68ef402 h1:8l0IQrh8vwqihv5jNhKCYB+YGH5hGGFL7od/2ETWrZw=
|
|
||||||
github.com/application-research/go-bs-autobatch v0.0.0-20211215020302-c4c0b68ef402/go.mod h1:86iZMWoyMLfLpYyd0aMPyECUpFwf8oZRjS5jJ9quZ7I=
|
|
||||||
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
||||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
|
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
|
||||||
|
Loading…
Reference in New Issue
Block a user