use a single map for tracking pending writes, properly track implicits

This commit is contained in:
vyzo 2021-07-03 10:27:03 +03:00
parent 5834231e58
commit 39723bbe60

View File

@ -156,8 +156,7 @@ type SplitStore struct {
txnProtect MarkSet txnProtect MarkSet
// pending write set // pending write set
pendingWrites map[cid.Cid]struct{} pendingWrites map[cid.Cid]bool
pendingWritesImplicit map[cid.Cid]struct{}
} }
var _ bstore.Blockstore = (*SplitStore)(nil) var _ bstore.Blockstore = (*SplitStore)(nil)
@ -199,7 +198,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
coldPurgeSize: defaultColdPurgeSize, coldPurgeSize: defaultColdPurgeSize,
pendingWrites: make(map[cid.Cid]struct{}), pendingWrites: make(map[cid.Cid]bool),
} }
ss.ctx, ss.cancel = context.WithCancel(context.Background()) ss.ctx, ss.cancel = context.WithCancel(context.Background())
@ -579,7 +578,7 @@ func (s *SplitStore) trackWrite(c cid.Cid, implicit bool) {
s.mx.Lock() s.mx.Lock()
defer s.mx.Unlock() defer s.mx.Unlock()
s.pendingWrites[c] = struct{}{} s.pendingWrites[c] = implicit
} }
// and also combine batch writes into them // and also combine batch writes into them
@ -588,7 +587,7 @@ func (s *SplitStore) trackWriteMany(cids []cid.Cid) {
defer s.mx.Unlock() defer s.mx.Unlock()
for _, c := range cids { for _, c := range cids {
s.pendingWrites[c] = struct{}{} s.pendingWrites[c] = false
} }
} }
@ -605,7 +604,7 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
cids := make([]cid.Cid, 0, len(s.pendingWrites)) cids := make([]cid.Cid, 0, len(s.pendingWrites))
seen := make(map[cid.Cid]struct{}) seen := make(map[cid.Cid]struct{})
walked := cid.NewSet() walked := cid.NewSet()
for c := range s.pendingWrites { for c, implicit := range s.pendingWrites {
_, ok := seen[c] _, ok := seen[c]
if ok { if ok {
continue continue
@ -614,7 +613,6 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
cids = append(cids, c) cids = append(cids, c)
seen[c] = struct{}{} seen[c] = struct{}{}
_, implicit := s.pendingWritesImplicit[c]
if !implicit { if !implicit {
continue continue
} }
@ -639,10 +637,7 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
} }
} }
s.pendingWrites = make(map[cid.Cid]struct{}) s.pendingWrites = make(map[cid.Cid]bool)
if len(s.pendingWritesImplicit) > 0 {
s.pendingWritesImplicit = make(map[cid.Cid]struct{})
}
epoch := s.writeEpoch epoch := s.writeEpoch
err := s.tracker.PutBatch(cids, epoch) err := s.tracker.PutBatch(cids, epoch)