diff --git a/blockstore/autobatch.go b/blockstore/autobatch.go index 5d78e92a1..ed52f39cd 100644 --- a/blockstore/autobatch.go +++ b/blockstore/autobatch.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "golang.org/x/xerrors" + block "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ) @@ -22,38 +24,6 @@ type AutobatchBlockstore struct { bufferSize int } -func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error { - panic("implement me") -} - -func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { - panic("implement me") -} - -func (bs *AutobatchBlockstore) GetSize(context.Context, cid.Cid) (int, error) { - panic("implement me") -} - -func (bs *AutobatchBlockstore) PutMany(context.Context, []block.Block) error { - panic("implement me") -} - -func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - panic("implement me") -} - -func (bs *AutobatchBlockstore) HashOnRead(enabled bool) { - panic("implement me") -} - -func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { - panic("implement me") -} - -func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { - panic("implement me") -} - func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore { bs := &AutobatchBlockstore{ backingBs: backingBs, @@ -64,12 +34,6 @@ func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) return bs } -// May NOT `Get` blocks that have been `Put` into this store -// Only guaranteed to fetch those that were already in the backingBs at creation of this store and those at the most recent `Flush` -func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) { - return bs.backingBs.Get(ctx, c) -} - func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error { bs.bufferedBlksLk.Lock() _, ok := bs.addedCids[blk.Cid()] @@ -90,9 +54,84 @@ func (bs *AutobatchBlockstore) Flush(ctx context.Context) { bs.flushLk.Lock() defer bs.flushLk.Unlock() bs.bufferedBlksLk.Lock() + // We do NOT clear addedCids here, because its purpose is to expedite Puts toFlush := bs.bufferedBlks bs.bufferedBlks = []block.Block{} bs.bufferedBlksLk.Unlock() - // error????? - bs.backingBs.PutMany(ctx, toFlush) + err := bs.backingBs.PutMany(ctx, toFlush) + autolog.Errorf("FLUSH ERRORED, maybe async: %w", err) +} + +// May be very slow if the cid queried wasn't in the backingBs at the time of creation of this AutobatchBlockstore +func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) { + blk, err := bs.backingBs.Get(ctx, c) + if err == nil { + return blk, nil + } + + if err != ErrNotFound { + return blk, err + } + + bs.Flush(ctx) + return bs.backingBs.Get(ctx, c) +} + +func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error { + // if we wanted to support this, we would have to: + // - flush + // - delete from the backingBs (if present) + // - remove from addedCids (if present) + // - if present in addedCids, also walk bufferedBlks and remove if present + return xerrors.New("deletion is unsupported") +} + +func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { + // see note in DeleteBlock() + return xerrors.New("deletion is unsupported") +} + +func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + _, err := bs.Get(ctx, c) + if err == nil { + return true, nil + } + if err == ErrNotFound { + return false, nil + } + + return false, err +} + +func (bs *AutobatchBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + blk, err := bs.Get(ctx, c) + if err != nil { + return 0, err + } + + return len(blk.RawData()), nil +} + +func (bs *AutobatchBlockstore) PutMany(ctx context.Context, blks []block.Block) error { + for _, blk := range blks { + if err := bs.Put(ctx, blk); err != nil { + return err + } + } + + return nil +} + +func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + bs.Flush(ctx) + return bs.backingBs.AllKeysChan(ctx) +} + +func (bs *AutobatchBlockstore) HashOnRead(enabled bool) { + bs.backingBs.HashOnRead(enabled) +} + +func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { + bs.Flush(ctx) + return bs.backingBs.View(ctx, cid, callback) } diff --git a/chain/consensus/filcns/upgrades.go b/chain/consensus/filcns/upgrades.go index 548f59aac..c7b407a6d 100644 --- a/chain/consensus/filcns/upgrades.go +++ b/chain/consensus/filcns/upgrades.go @@ -1264,11 +1264,7 @@ func upgradeActorsV7Common( root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet, config nv15.Config, ) (cid.Cid, error) { - - ctxWithCancel, cancel := context.WithCancel(ctx) - defer cancel() - - writeStore := blockstore.NewAutobatch(ctxWithCancel, sm.ChainStore().StateBlockstore(), units.GiB) + writeStore := blockstore.NewAutobatch(ctx, sm.ChainStore().StateBlockstore(), units.GiB) // TODO: pretty sure we'd achieve nothing by doing this, confirm in review //buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore) store := store.ActorStore(ctx, writeStore) @@ -1301,8 +1297,7 @@ func upgradeActorsV7Common( return cid.Undef, xerrors.Errorf("failed to persist new state root: %w", err) } - // Persist the new tree. - + // Persist the new tree. Blocks until the entire writeStore is in the state blockstore. writeStore.Flush(ctx) return newRoot, nil