package splitstore import ( "time" "golang.org/x/xerrors" cid "github.com/ipfs/go-cid" bolt "go.etcd.io/bbolt" "github.com/filecoin-project/go-state-types/abi" ) type BoltTrackingStore struct { db *bolt.DB bucketId []byte } var _ TrackingStore = (*BoltTrackingStore)(nil) func OpenBoltTrackingStore(path string) (*BoltTrackingStore, error) { opts := &bolt.Options{ Timeout: 1 * time.Second, NoSync: true, } db, err := bolt.Open(path, 0644, opts) if err != nil { return nil, err } bucketId := []byte("tracker") err = db.Update(func(tx *bolt.Tx) error { _, err := tx.CreateBucketIfNotExists(bucketId) if err != nil { return xerrors.Errorf("error creating bolt db bucket %s: %w", string(bucketId), err) } return nil }) if err != nil { _ = db.Close() return nil, err } return &BoltTrackingStore{db: db, bucketId: bucketId}, nil } func (s *BoltTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error { val := epochToBytes(epoch) return s.db.Batch(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) return b.Put(cid.Hash(), val) }) } func (s *BoltTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error { val := epochToBytes(epoch) return s.db.Batch(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) for _, cid := range cids { err := b.Put(cid.Hash(), val) if err != nil { return err } } return nil }) } func (s *BoltTrackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) { err = s.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) val := b.Get(cid.Hash()) if val == nil { return xerrors.Errorf("missing tracking epoch for %s", cid) } epoch = bytesToEpoch(val) return nil }) return epoch, err } func (s *BoltTrackingStore) Delete(cid cid.Cid) error { return s.db.Batch(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) return b.Delete(cid.Hash()) }) } func (s *BoltTrackingStore) DeleteBatch(cids map[cid.Cid]struct{}) error { return s.db.Batch(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) for cid := range cids { err := b.Delete(cid.Hash()) if err != nil { return xerrors.Errorf("error deleting %s", cid) } } return nil }) } func (s *BoltTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error { return s.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucketId) return b.ForEach(func(k, v []byte) error { cid := cid.NewCidV1(cid.Raw, k) epoch := bytesToEpoch(v) return f(cid, epoch) }) }) } func (s *BoltTrackingStore) Sync() error { return s.db.Sync() } func (s *BoltTrackingStore) Close() error { return s.db.Close() }