diff --git a/blockstore/splitstore/markset_bolt.go b/blockstore/splitstore/markset_bolt.go index cab0dd74a..bac7673b8 100644 --- a/blockstore/splitstore/markset_bolt.go +++ b/blockstore/splitstore/markset_bolt.go @@ -1,6 +1,7 @@ package splitstore import ( + "sync" "time" "golang.org/x/xerrors" @@ -9,6 +10,8 @@ import ( bolt "go.etcd.io/bbolt" ) +const boltMarkSetStaging = 16384 + type BoltMarkSetEnv struct { db *bolt.DB } @@ -18,6 +21,10 @@ var _ MarkSetEnv = (*BoltMarkSetEnv)(nil) type BoltMarkSet struct { db *bolt.DB bucketId []byte + + // cache for batching + mx sync.RWMutex + pend map[string]struct{} } var _ MarkSet = (*BoltMarkSet)(nil) @@ -49,7 +56,11 @@ func (e *BoltMarkSetEnv) Create(name string, hint int64) (MarkSet, error) { return nil, err } - return &BoltMarkSet{db: e.db, bucketId: bucketId}, nil + return &BoltMarkSet{ + db: e.db, + bucketId: bucketId, + pend: make(map[string]struct{}), + }, nil } func (e *BoltMarkSetEnv) Close() error { @@ -57,16 +68,48 @@ func (e *BoltMarkSetEnv) Close() error { } func (s *BoltMarkSet) Mark(cid cid.Cid) error { - return s.db.Update(func(tx *bolt.Tx) error { + s.mx.Lock() + defer s.mx.Unlock() + + key := cid.Hash() + s.pend[string(key)] = struct{}{} + + if len(s.pend) < boltMarkSetStaging { + return nil + } + + err := s.db.Batch(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) - return b.Put(cid.Hash(), markBytes) + for key := range s.pend { + err := b.Put([]byte(key), markBytes) + if err != nil { + return err + } + } + return nil }) + + if err != nil { + return err + } + + s.pend = make(map[string]struct{}) + return nil } func (s *BoltMarkSet) Has(cid cid.Cid) (result bool, err error) { + s.mx.RLock() + defer s.mx.RUnlock() + + key := cid.Hash() + _, result = s.pend[string(key)] + if result { + return result, nil + } + err = s.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) - v := b.Get(cid.Hash()) + v := b.Get(key) result = v != nil return nil })