diff --git a/blockstore/autobatch.go b/blockstore/autobatch.go index 5f64979a2..e7694f931 100644 --- a/blockstore/autobatch.go +++ b/blockstore/autobatch.go @@ -3,6 +3,7 @@ package blockstore import ( "context" "sync" + "time" "golang.org/x/xerrors" @@ -15,24 +16,39 @@ import ( var autolog = log.Named("auto") type AutobatchBlockstore struct { - bufferedBlks []block.Block - addedCids map[cid.Cid]struct{} - bufferedBlksLk sync.Mutex - flushCh chan struct{} - flushErr error - shutdownCh chan struct{} - flushCtx context.Context - backingBs Blockstore + // TODO: drop if memory consumption is too high + addedCids map[cid.Cid]struct{} + + bufferedLk sync.Mutex + bufferedBlksOrdered []block.Block + bufferedBlksMap map[cid.Cid]block.Block + + flushingLk sync.Mutex + flushingBlksMap map[cid.Cid]block.Block + + flushCh chan struct{} + flushErr error + flushRetryDelay time.Duration + flushCtx context.Context + shutdownCh chan struct{} + + backingBs Blockstore + bufferCapacity int bufferSize int } func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore { bs := &AutobatchBlockstore{ - backingBs: backingBs, - bufferCapacity: bufferCapacity, - addedCids: make(map[cid.Cid]struct{}), - flushCtx: ctx, + addedCids: make(map[cid.Cid]struct{}), + backingBs: backingBs, + bufferCapacity: bufferCapacity, + bufferedBlksMap: make(map[cid.Cid]block.Block), + flushingBlksMap: make(map[cid.Cid]block.Block), + flushCtx: ctx, + flushCh: make(chan struct{}, 1), + // could be made configable + flushRetryDelay: time.Second * 5, } go bs.flushWorker() @@ -41,11 +57,12 @@ func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) } func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error { - bs.bufferedBlksLk.Lock() + bs.bufferedLk.Lock() _, ok := bs.addedCids[blk.Cid()] if !ok { - bs.bufferedBlks = append(bs.bufferedBlks, blk) bs.addedCids[blk.Cid()] = struct{}{} + bs.bufferedBlksOrdered = append(bs.bufferedBlksOrdered, blk) + bs.bufferedBlksMap[blk.Cid()] = blk bs.bufferSize += len(blk.RawData()) if bs.bufferSize >= bs.bufferCapacity { // signal that a flush is appropriate, may be ignored @@ -56,7 +73,7 @@ func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error { } } } - bs.bufferedBlksLk.Unlock() + bs.bufferedLk.Unlock() return nil } @@ -65,9 +82,16 @@ func (bs *AutobatchBlockstore) flushWorker() { select { case <-bs.flushCh: putErr := bs.doFlush(bs.flushCtx) - if putErr != nil { - autolog.Errorf("FLUSH ERRORED: %w", putErr) - bs.flushErr = xerrors.Errorf("%w, put error: %w", bs.flushErr, putErr) + for putErr != nil { + select { + case <-bs.shutdownCh: + bs.flushErr = putErr + return + default: + autolog.Errorf("FLUSH ERRORED: %w, retrying in %v", putErr, bs.flushRetryDelay) + time.Sleep(bs.flushRetryDelay) + putErr = bs.doFlush(bs.flushCtx) + } } case <-bs.shutdownCh: return @@ -76,20 +100,24 @@ func (bs *AutobatchBlockstore) flushWorker() { } func (bs *AutobatchBlockstore) doFlush(ctx context.Context) error { - bs.bufferedBlksLk.Lock() + bs.bufferedLk.Lock() + bs.flushingLk.Lock() // We do NOT clear addedCids here, because its purpose is to expedite Puts - toFlush := bs.bufferedBlks - bs.bufferedBlks = []block.Block{} - bs.bufferedBlksLk.Unlock() - return bs.backingBs.PutMany(ctx, toFlush) -} - -func (bs *AutobatchBlockstore) Flush(ctx context.Context) error { - return bs.doFlush(ctx) + flushingBlksOrdered := bs.bufferedBlksOrdered + bs.flushingBlksMap = bs.bufferedBlksMap + bs.bufferedBlksOrdered = []block.Block{} + bs.bufferedBlksMap = make(map[cid.Cid]block.Block) + bs.bufferedLk.Unlock() + bs.flushingLk.Unlock() + return bs.backingBs.PutMany(ctx, flushingBlksOrdered) } func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error { + // request one last flush of the worker + bs.flushCh <- struct{}{} + // shutdown the flush worker bs.shutdownCh <- struct{}{} + // if it ever errored, this method fails if bs.flushErr != nil { return xerrors.Errorf("flushWorker errored: %w", bs.flushErr) } @@ -98,8 +126,8 @@ func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error { return bs.doFlush(ctx) } -// May be very slow if the cid queried wasn't in the backingBs at the time of creation of this AutobatchBlockstore func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) { + // may seem backward to check the backingBs first, but that is the likeliest case blk, err := bs.backingBs.Get(ctx, c) if err == nil { return blk, nil @@ -109,16 +137,24 @@ func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, return blk, err } - bs.Flush(ctx) - return bs.backingBs.Get(ctx, c) + bs.flushingLk.Lock() + v, ok := bs.flushingBlksMap[c] + bs.flushingLk.Unlock() + if ok { + return v, nil + } + + bs.bufferedLk.Lock() + v, ok = bs.bufferedBlksMap[c] + bs.bufferedLk.Unlock() + if ok { + return v, nil + } + + return nil, ErrNotFound } func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error { - // if we wanted to support this, we would have to: - // - flush - // - delete from the backingBs (if present) - // - remove from addedCids (if present) - // - if present in addedCids, also walk bufferedBlks and remove if present return xerrors.New("deletion is unsupported") } @@ -159,8 +195,8 @@ func (bs *AutobatchBlockstore) PutMany(ctx context.Context, blks []block.Block) } func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - bs.Flush(ctx) - return bs.backingBs.AllKeysChan(ctx) + return nil, xerrors.New("unsupported") + } func (bs *AutobatchBlockstore) HashOnRead(enabled bool) { @@ -168,6 +204,5 @@ func (bs *AutobatchBlockstore) HashOnRead(enabled bool) { } func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { - bs.Flush(ctx) - return bs.backingBs.View(ctx, cid, callback) + return xerrors.New("unsupported") } diff --git a/chain/consensus/filcns/upgrades.go b/chain/consensus/filcns/upgrades.go index 72b1605fa..066fb8a50 100644 --- a/chain/consensus/filcns/upgrades.go +++ b/chain/consensus/filcns/upgrades.go @@ -1297,11 +1297,7 @@ func upgradeActorsV7Common( return cid.Undef, xerrors.Errorf("failed to persist new state root: %w", err) } - // Persist the new tree. Blocks until the entire writeStore is in the state blockstore. - if err := writeStore.Flush(ctx); err != nil { - return cid.Undef, xerrors.Errorf("failed to flush writestore: %w", err) - } - + // Persists the new tree and shuts down the flush worker if err := writeStore.Shutdown(ctx); err != nil { return cid.Undef, xerrors.Errorf("writeStore failed: %w", err) }