diff --git a/blockstore/autobatch.go b/blockstore/autobatch.go index ed52f39cd..5f64979a2 100644 --- a/blockstore/autobatch.go +++ b/blockstore/autobatch.go @@ -18,7 +18,10 @@ type AutobatchBlockstore struct { bufferedBlks []block.Block addedCids map[cid.Cid]struct{} bufferedBlksLk sync.Mutex - flushLk sync.Mutex + flushCh chan struct{} + flushErr error + shutdownCh chan struct{} + flushCtx context.Context backingBs Blockstore bufferCapacity int bufferSize int @@ -29,8 +32,11 @@ func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) backingBs: backingBs, bufferCapacity: bufferCapacity, addedCids: make(map[cid.Cid]struct{}), + flushCtx: ctx, } + go bs.flushWorker() + return bs } @@ -42,24 +48,54 @@ func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error { bs.addedCids[blk.Cid()] = struct{}{} bs.bufferSize += len(blk.RawData()) if bs.bufferSize >= bs.bufferCapacity { - // time to flush - go bs.Flush(ctx) + // signal that a flush is appropriate, may be ignored + select { + case bs.flushCh <- struct{}{}: + default: + // do nothing + } } } bs.bufferedBlksLk.Unlock() return nil } -func (bs *AutobatchBlockstore) Flush(ctx context.Context) { - bs.flushLk.Lock() - defer bs.flushLk.Unlock() +func (bs *AutobatchBlockstore) flushWorker() { + for { + select { + case <-bs.flushCh: + putErr := bs.doFlush(bs.flushCtx) + if putErr != nil { + autolog.Errorf("FLUSH ERRORED: %w", putErr) + bs.flushErr = xerrors.Errorf("%w, put error: %w", bs.flushErr, putErr) + } + case <-bs.shutdownCh: + return + } + } +} + +func (bs *AutobatchBlockstore) doFlush(ctx context.Context) error { bs.bufferedBlksLk.Lock() // We do NOT clear addedCids here, because its purpose is to expedite Puts toFlush := bs.bufferedBlks bs.bufferedBlks = []block.Block{} bs.bufferedBlksLk.Unlock() - err := bs.backingBs.PutMany(ctx, toFlush) - autolog.Errorf("FLUSH ERRORED, maybe async: %w", err) + return bs.backingBs.PutMany(ctx, toFlush) +} + +func (bs *AutobatchBlockstore) Flush(ctx context.Context) error { + return bs.doFlush(ctx) +} + +func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error { + bs.shutdownCh <- struct{}{} + if bs.flushErr != nil { + return xerrors.Errorf("flushWorker errored: %w", bs.flushErr) + } + + // one last flush in case it's needed + return bs.doFlush(ctx) } // May be very slow if the cid queried wasn't in the backingBs at the time of creation of this AutobatchBlockstore diff --git a/chain/consensus/filcns/upgrades.go b/chain/consensus/filcns/upgrades.go index c7b407a6d..72b1605fa 100644 --- a/chain/consensus/filcns/upgrades.go +++ b/chain/consensus/filcns/upgrades.go @@ -1298,7 +1298,13 @@ func upgradeActorsV7Common( } // Persist the new tree. Blocks until the entire writeStore is in the state blockstore. - writeStore.Flush(ctx) + if err := writeStore.Flush(ctx); err != nil { + return cid.Undef, xerrors.Errorf("failed to flush writestore: %w", err) + } + + if err := writeStore.Shutdown(ctx); err != nil { + return cid.Undef, xerrors.Errorf("writeStore failed: %w", err) + } return newRoot, nil }