diff --git a/blockstore/autobatch.go b/blockstore/autobatch.go index e7694f931..cd3991246 100644 --- a/blockstore/autobatch.go +++ b/blockstore/autobatch.go @@ -15,19 +15,25 @@ import ( // blockstore logger. var autolog = log.Named("auto") +// contains the same set of blocks twice, once as an ordered list for flushing, and as a map for fast access +type blockBatch struct { + blockList []block.Block + blockMap map[cid.Cid]block.Block +} + type AutobatchBlockstore struct { // TODO: drop if memory consumption is too high addedCids map[cid.Cid]struct{} - bufferedLk sync.Mutex - bufferedBlksOrdered []block.Block - bufferedBlksMap map[cid.Cid]block.Block + lock sync.Mutex + bufferedBatch blockBatch - flushingLk sync.Mutex - flushingBlksMap map[cid.Cid]block.Block + // the flush worker has sole control (including read) over the flushingBatch.blockList and flushErr until shutdown + flushingBatch blockBatch + flushErr error + + flushCh chan struct{} - flushCh chan struct{} - flushErr error flushRetryDelay time.Duration flushCtx context.Context shutdownCh chan struct{} @@ -40,29 +46,32 @@ type AutobatchBlockstore struct { func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore { bs := &AutobatchBlockstore{ - addedCids: make(map[cid.Cid]struct{}), - backingBs: backingBs, - bufferCapacity: bufferCapacity, - bufferedBlksMap: make(map[cid.Cid]block.Block), - flushingBlksMap: make(map[cid.Cid]block.Block), - flushCtx: ctx, - flushCh: make(chan struct{}, 1), + addedCids: make(map[cid.Cid]struct{}), + backingBs: backingBs, + bufferCapacity: bufferCapacity, + flushCtx: ctx, + flushCh: make(chan struct{}, 1), // could be made configable - flushRetryDelay: time.Second * 5, + flushRetryDelay: time.Millisecond * 100, } + bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block) + bs.flushingBatch.blockMap = make(map[cid.Cid]block.Block) + go bs.flushWorker() return bs } func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error { - bs.bufferedLk.Lock() + bs.lock.Lock() + defer bs.lock.Unlock() + _, ok := bs.addedCids[blk.Cid()] if !ok { bs.addedCids[blk.Cid()] = struct{}{} - bs.bufferedBlksOrdered = append(bs.bufferedBlksOrdered, blk) - bs.bufferedBlksMap[blk.Cid()] = blk + bs.bufferedBatch.blockList = append(bs.bufferedBatch.blockList, blk) + bs.bufferedBatch.blockMap[blk.Cid()] = blk bs.bufferSize += len(blk.RawData()) if bs.bufferSize >= bs.bufferCapacity { // signal that a flush is appropriate, may be ignored @@ -73,7 +82,7 @@ func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error { } } } - bs.bufferedLk.Unlock() + return nil } @@ -85,11 +94,9 @@ func (bs *AutobatchBlockstore) flushWorker() { for putErr != nil { select { case <-bs.shutdownCh: - bs.flushErr = putErr return - default: - autolog.Errorf("FLUSH ERRORED: %w, retrying in %v", putErr, bs.flushRetryDelay) - time.Sleep(bs.flushRetryDelay) + case <-time.After(bs.flushRetryDelay): + autolog.Errorf("FLUSH ERRORED: %w, retrying after %v", putErr, bs.flushRetryDelay) putErr = bs.doFlush(bs.flushCtx) } } @@ -99,31 +106,31 @@ func (bs *AutobatchBlockstore) flushWorker() { } } +// caller must NOT hold lock func (bs *AutobatchBlockstore) doFlush(ctx context.Context) error { - bs.bufferedLk.Lock() - bs.flushingLk.Lock() - // We do NOT clear addedCids here, because its purpose is to expedite Puts - flushingBlksOrdered := bs.bufferedBlksOrdered - bs.flushingBlksMap = bs.bufferedBlksMap - bs.bufferedBlksOrdered = []block.Block{} - bs.bufferedBlksMap = make(map[cid.Cid]block.Block) - bs.bufferedLk.Unlock() - bs.flushingLk.Unlock() - return bs.backingBs.PutMany(ctx, flushingBlksOrdered) + if bs.flushErr == nil { + bs.lock.Lock() + // We do NOT clear addedCids here, because its purpose is to expedite Puts + bs.flushingBatch = bs.bufferedBatch + bs.bufferedBatch.blockList = make([]block.Block, 0, len(bs.flushingBatch.blockList)) + bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block, len(bs.flushingBatch.blockMap)) + bs.lock.Unlock() + } + + bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList) + return bs.flushErr +} + +// caller must NOT hold lock +func (bs *AutobatchBlockstore) Flush(ctx context.Context) error { + return bs.doFlush(ctx) } func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error { - // request one last flush of the worker - bs.flushCh <- struct{}{} // shutdown the flush worker bs.shutdownCh <- struct{}{} - // if it ever errored, this method fails - if bs.flushErr != nil { - return xerrors.Errorf("flushWorker errored: %w", bs.flushErr) - } - // one last flush in case it's needed - return bs.doFlush(ctx) + return bs.flushErr } func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) { @@ -137,24 +144,28 @@ func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, return blk, err } - bs.flushingLk.Lock() - v, ok := bs.flushingBlksMap[c] - bs.flushingLk.Unlock() + bs.lock.Lock() + defer bs.lock.Unlock() + v, ok := bs.flushingBatch.blockMap[c] if ok { return v, nil } - bs.bufferedLk.Lock() - v, ok = bs.bufferedBlksMap[c] - bs.bufferedLk.Unlock() + v, ok = bs.flushingBatch.blockMap[c] if ok { return v, nil } - return nil, ErrNotFound + // check the backingBs in case it just got put in the backingBs (and removed from the batch maps) while we were here + return bs.backingBs.Get(ctx, c) } func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error { + // if we wanted to support this, we would have to: + // - flush + // - delete from the backingBs (if present) + // - remove from addedCids (if present) + // - if present in addedCids, also walk the ordered lists and remove if present return xerrors.New("deletion is unsupported") } @@ -195,8 +206,11 @@ func (bs *AutobatchBlockstore) PutMany(ctx context.Context, blks []block.Block) } func (bs *AutobatchBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - return nil, xerrors.New("unsupported") + if err := bs.Flush(ctx); err != nil { + return nil, err + } + return bs.backingBs.AllKeysChan(ctx) } func (bs *AutobatchBlockstore) HashOnRead(enabled bool) { @@ -204,5 +218,9 @@ func (bs *AutobatchBlockstore) HashOnRead(enabled bool) { } func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { - return xerrors.New("unsupported") + if err := bs.Flush(ctx); err != nil { + return err + } + + return bs.backingBs.View(ctx, cid, callback) } diff --git a/chain/consensus/filcns/upgrades.go b/chain/consensus/filcns/upgrades.go index 066fb8a50..a21bdb05e 100644 --- a/chain/consensus/filcns/upgrades.go +++ b/chain/consensus/filcns/upgrades.go @@ -1298,8 +1298,12 @@ func upgradeActorsV7Common( } // Persists the new tree and shuts down the flush worker + if err := writeStore.Flush(ctx); err != nil { + return cid.Undef, xerrors.Errorf("writeStore flush failed: %w", err) + } + if err := writeStore.Shutdown(ctx); err != nil { - return cid.Undef, xerrors.Errorf("writeStore failed: %w", err) + return cid.Undef, xerrors.Errorf("writeStore shutdown failed: %w", err) } return newRoot, nil