lotus/blockstore/splitstore/tracking.go
2021-03-05 19:55:32 +02:00

110 lines
2.6 KiB
Go

package splitstore
import (
"path/filepath"
"sync"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
cid "github.com/ipfs/go-cid"
)
// TrackingStore is a persistent store that tracks blocks that are added
// to the hotstore, tracking the epoch at which they are written.
type TrackingStore interface {
Put(cid.Cid, abi.ChainEpoch) error
PutBatch([]cid.Cid, abi.ChainEpoch) error
Get(cid.Cid) (abi.ChainEpoch, error)
Delete(cid.Cid) error
DeleteBatch([]cid.Cid) error
ForEach(func(cid.Cid, abi.ChainEpoch) error) error
Sync() error
Close() error
}
// OpenTrackingStore opens a tracking store of the specified type in the
// specified path.
func OpenTrackingStore(path string, ttype string) (TrackingStore, error) {
switch ttype {
case "", "bolt":
return OpenBoltTrackingStore(filepath.Join(path, "tracker.bolt"))
case "mem":
return NewMemTrackingStore(), nil
default:
return nil, xerrors.Errorf("unknown tracking store type %s", ttype)
}
}
// NewMemTrackingStore creates an in-memory tracking store.
// This is only useful for test or situations where you don't want to open the
// real tracking store (eg concurrent read only access on a node's datastore)
func NewMemTrackingStore() *MemTrackingStore {
return &MemTrackingStore{tab: make(map[cid.Cid]abi.ChainEpoch)}
}
// MemTrackingStore is a simple in-memory tracking store
type MemTrackingStore struct {
sync.Mutex
tab map[cid.Cid]abi.ChainEpoch
}
var _ TrackingStore = (*MemTrackingStore)(nil)
func (s *MemTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
s.Lock()
defer s.Unlock()
s.tab[cid] = epoch
return nil
}
func (s *MemTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
s.Lock()
defer s.Unlock()
for _, cid := range cids {
s.tab[cid] = epoch
}
return nil
}
func (s *MemTrackingStore) Get(cid cid.Cid) (abi.ChainEpoch, error) {
s.Lock()
defer s.Unlock()
epoch, ok := s.tab[cid]
if ok {
return epoch, nil
}
return 0, xerrors.Errorf("missing tracking epoch for %s", cid)
}
func (s *MemTrackingStore) Delete(cid cid.Cid) error {
s.Lock()
defer s.Unlock()
delete(s.tab, cid)
return nil
}
func (s *MemTrackingStore) DeleteBatch(cids []cid.Cid) error {
s.Lock()
defer s.Unlock()
for _, cid := range cids {
delete(s.tab, cid)
}
return nil
}
func (s *MemTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
s.Lock()
defer s.Unlock()
for cid, epoch := range s.tab {
err := f(cid, epoch)
if err != nil {
return err
}
}
return nil
}
func (s *MemTrackingStore) Sync() error { return nil }
func (s *MemTrackingStore) Close() error { return nil }