diff --git a/blockstore/autobatch.go b/blockstore/autobatch.go index 53764d15c..6393b2e22 100644 --- a/blockstore/autobatch.go +++ b/blockstore/autobatch.go @@ -26,18 +26,17 @@ type AutobatchBlockstore struct { addedCids map[cid.Cid]struct{} stateLock sync.Mutex - doFlushLock sync.Mutex bufferedBatch blockBatch - flushingBatch blockBatch - flushErr error - flushWorkerDone bool + flushingBatch blockBatch + flushErr error flushCh chan struct{} + doFlushLock sync.Mutex flushRetryDelay time.Duration - flushCtx context.Context - shutdownCh chan struct{} + doneCh chan struct{} + shutdown context.CancelFunc backingBs Blockstore @@ -46,21 +45,21 @@ type AutobatchBlockstore struct { } func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore { + ctx, cancel := context.WithCancel(ctx) bs := &AutobatchBlockstore{ addedCids: make(map[cid.Cid]struct{}), backingBs: backingBs, bufferCapacity: bufferCapacity, - flushCtx: ctx, flushCh: make(chan struct{}, 1), - shutdownCh: make(chan struct{}), + doneCh: make(chan struct{}), // could be made configable flushRetryDelay: time.Millisecond * 100, - flushWorkerDone: false, + shutdown: cancel, } bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block) - go bs.flushWorker() + go bs.flushWorker(ctx) return bs } @@ -88,66 +87,85 @@ func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error { return nil } -func (bs *AutobatchBlockstore) flushWorker() { - defer func() { - bs.stateLock.Lock() - bs.flushWorkerDone = true - bs.stateLock.Unlock() - }() +func (bs *AutobatchBlockstore) flushWorker(ctx context.Context) { + defer close(bs.doneCh) for { select { case <-bs.flushCh: - putErr := bs.doFlush(bs.flushCtx) + // TODO: check if we _should_ actually flush. We could get a spurious wakeup + // here. + putErr := bs.doFlush(ctx, false) for putErr != nil { select { - case <-bs.shutdownCh: + case <-ctx.Done(): return case <-time.After(bs.flushRetryDelay): autolog.Errorf("FLUSH ERRORED: %w, retrying after %v", putErr, bs.flushRetryDelay) - putErr = bs.doFlush(bs.flushCtx) + putErr = bs.doFlush(ctx, true) } } - case <-bs.shutdownCh: + case <-ctx.Done(): + // Do one last flush. + _ = bs.doFlush(ctx, false) return } } } // caller must NOT hold stateLock -func (bs *AutobatchBlockstore) doFlush(ctx context.Context) error { +// set retryOnly to true to only retry a failed flush and not flush anything new. +func (bs *AutobatchBlockstore) doFlush(ctx context.Context, retryOnly bool) error { bs.doFlushLock.Lock() defer bs.doFlushLock.Unlock() - if bs.flushErr == nil { - bs.stateLock.Lock() - // We do NOT clear addedCids here, because its purpose is to expedite Puts - bs.flushingBatch = bs.bufferedBatch - bs.bufferedBatch.blockList = make([]block.Block, 0, len(bs.flushingBatch.blockList)) - bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block, len(bs.flushingBatch.blockMap)) - bs.stateLock.Unlock() + + // If we failed to flush last time, try flushing again. + if bs.flushErr != nil { + bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList) } - bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList) + // If we failed, or we're _only_ retrying, bail. + if retryOnly || bs.flushErr != nil { + return bs.flushErr + } + + // Then take the current batch... bs.stateLock.Lock() - bs.flushingBatch = blockBatch{} + // We do NOT clear addedCids here, because its purpose is to expedite Puts + bs.flushingBatch = bs.bufferedBatch + bs.bufferedBatch.blockList = make([]block.Block, 0, len(bs.flushingBatch.blockList)) + bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block, len(bs.flushingBatch.blockMap)) bs.stateLock.Unlock() + // And try to flush it. + bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList) + + // If we succeeded, reset the batch. Otherwise, we'll try again next time. + if bs.flushErr == nil { + bs.stateLock.Lock() + bs.flushingBatch = blockBatch{} + bs.stateLock.Unlock() + } + return bs.flushErr } // caller must NOT hold stateLock func (bs *AutobatchBlockstore) Flush(ctx context.Context) error { - return bs.doFlush(ctx) + return bs.doFlush(ctx, false) } func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error { - bs.stateLock.Lock() - flushDone := bs.flushWorkerDone - bs.stateLock.Unlock() - if !flushDone { - // may racily block forever if Shutdown is called in parallel - bs.shutdownCh <- struct{}{} + // TODO: Prevent puts after we call this to avoid losing data. + bs.shutdown() + select { + case <-bs.doneCh: + case <-ctx.Done(): + return ctx.Err() } + bs.doFlushLock.Lock() + defer bs.doFlushLock.Unlock() + return bs.flushErr }