From 3e8e9273cacb2c2a36f337f93e5cd3a6a2dcb458 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 2 Jul 2021 11:37:35 +0300 Subject: [PATCH] track all writes using async batching, not just implicit ones --- blockstore/splitstore/splitstore.go | 80 +++++++++++------------------ 1 file changed, 29 insertions(+), 51 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index ea7f21ea0..912a808d6 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -149,8 +149,8 @@ type SplitStore struct { txnEnv MarkSetEnv txnProtect MarkSet - // implicit write set - implicitWrites map[cid.Cid]struct{} + // pending write set + pendingWrites map[cid.Cid]struct{} } var _ bstore.Blockstore = (*SplitStore)(nil) @@ -192,7 +192,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co coldPurgeSize: defaultColdPurgeSize, - implicitWrites: make(map[cid.Cid]struct{}), + pendingWrites: make(map[cid.Cid]struct{}), } ss.ctx, ss.cancel = context.WithCancel(context.Background()) @@ -234,7 +234,7 @@ func (s *SplitStore) Has(cid cid.Cid) (bool, error) { // an implicit Write. // Unfortunately we can't just directly tracker.Put one by one, as it is ridiculously slow with // bolot because of syncing, so we batch them - s.putImplicitWrite(cid) + s.trackWrite(cid) // also make sure the object is considered live during compaction if s.txnProtect != nil { @@ -326,28 +326,12 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) { } func (s *SplitStore) Put(blk blocks.Block) error { - s.mx.Lock() - if s.curTs == nil { - s.mx.Unlock() - return s.cold.Put(blk) - } - - curTs := s.curTs - epoch := s.writeEpoch - s.mx.Unlock() - s.txnLk.RLock() defer s.txnLk.RUnlock() - err := s.tracker.Put(blk.Cid(), epoch) - if err != nil { - log.Errorf("error tracking CID in hotstore: %s; falling back to coldstore", err) - return s.cold.Put(blk) - } + s.trackWrite(blk.Cid()) - s.debug.LogWrite(curTs, blk.Cid(), epoch) - - err = s.hot.Put(blk) + err := s.hot.Put(blk) if err == nil && s.txnProtect != nil { err = s.txnProtect.Mark(blk.Cid()) if err != nil { @@ -363,16 +347,6 @@ func (s *SplitStore) Put(blk blocks.Block) error { } func (s *SplitStore) PutMany(blks []blocks.Block) error { - s.mx.Lock() - if s.curTs == nil { - s.mx.Unlock() - return s.cold.PutMany(blks) - } - - curTs := s.curTs - epoch := s.writeEpoch - s.mx.Unlock() - batch := make([]cid.Cid, 0, len(blks)) for _, blk := range blks { batch = append(batch, blk.Cid()) @@ -381,15 +355,9 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error { s.txnLk.RLock() defer s.txnLk.RUnlock() - err := s.tracker.PutBatch(batch, epoch) - if err != nil { - log.Errorf("error tracking CIDs in hotstore: %s; falling back to coldstore", err) - return s.cold.PutMany(blks) - } + s.trackWrites(batch) - s.debug.LogWriteMany(curTs, batch, epoch) - - err = s.hot.PutMany(blks) + err := s.hot.PutMany(blks) if err == nil && s.txnProtect != nil { for _, cid := range batch { err2 := s.txnProtect.Mark(cid) @@ -562,7 +530,7 @@ func (s *SplitStore) Close() error { } } - s.flushImplicitWrites(false) + s.flushPendingWrites(false) s.cancel() return multierr.Combine(s.tracker.Close(), s.env.Close(), s.debug.Close()) } @@ -623,7 +591,7 @@ func (s *SplitStore) updateWriteEpoch() { if dt < 0 { writeEpoch := curTs.Height() + 1 if writeEpoch > s.writeEpoch { - s.flushImplicitWrites(true) + s.flushPendingWrites(true) s.writeEpoch = writeEpoch } @@ -632,33 +600,42 @@ func (s *SplitStore) updateWriteEpoch() { writeEpoch := curTs.Height() + abi.ChainEpoch(dt.Seconds())/builtin.EpochDurationSeconds + 1 if writeEpoch > s.writeEpoch { - s.flushImplicitWrites(true) + s.flushPendingWrites(true) s.writeEpoch = writeEpoch } } -func (s *SplitStore) putImplicitWrite(c cid.Cid) { +func (s *SplitStore) trackWrite(c cid.Cid) { s.mx.Lock() defer s.mx.Unlock() - s.implicitWrites[c] = struct{}{} + s.pendingWrites[c] = struct{}{} } -func (s *SplitStore) flushImplicitWrites(locked bool) { +func (s *SplitStore) trackWrites(cids []cid.Cid) { + s.mx.Lock() + defer s.mx.Unlock() + + for _, c := range cids { + s.pendingWrites[c] = struct{}{} + } +} + +func (s *SplitStore) flushPendingWrites(locked bool) { if !locked { s.mx.Lock() defer s.mx.Unlock() } - if len(s.implicitWrites) == 0 { + if len(s.pendingWrites) == 0 { return } - cids := make([]cid.Cid, 0, len(s.implicitWrites)) - for c := range s.implicitWrites { + cids := make([]cid.Cid, 0, len(s.pendingWrites)) + for c := range s.pendingWrites { cids = append(cids, c) } - s.implicitWrites = make(map[cid.Cid]struct{}) + s.pendingWrites = make(map[cid.Cid]struct{}) epoch := s.writeEpoch curTs := s.curTs @@ -938,7 +915,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { defer s.debug.Flush() - s.flushImplicitWrites(false) + // flush pending writes to update the tracker + s.flushPendingWrites(false) // 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch)