275 lines
4.8 KiB
Go
275 lines
4.8 KiB
Go
package splitstore
|
|
|
|
import (
|
|
"bufio"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"golang.org/x/xerrors"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
)
|
|
|
|
type MapMarkSetEnv struct {
|
|
path string
|
|
}
|
|
|
|
var _ MarkSetEnv = (*MapMarkSetEnv)(nil)
|
|
|
|
type MapMarkSet struct {
|
|
mx sync.RWMutex
|
|
set map[string]struct{}
|
|
|
|
persist bool
|
|
file *os.File
|
|
buf *bufio.Writer
|
|
|
|
path string
|
|
}
|
|
|
|
var _ MarkSet = (*MapMarkSet)(nil)
|
|
|
|
func NewMapMarkSetEnv(path string) (*MapMarkSetEnv, error) {
|
|
msPath := filepath.Join(path, "markset.map")
|
|
err := os.MkdirAll(msPath, 0755) //nolint:gosec
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("error creating markset directory: %w", err)
|
|
}
|
|
|
|
return &MapMarkSetEnv{path: msPath}, nil
|
|
}
|
|
|
|
func (e *MapMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
|
|
path := filepath.Join(e.path, name)
|
|
return &MapMarkSet{
|
|
set: make(map[string]struct{}, sizeHint),
|
|
path: path,
|
|
}, nil
|
|
}
|
|
|
|
func (e *MapMarkSetEnv) Recover(name string) (MarkSet, error) {
|
|
path := filepath.Join(e.path, name)
|
|
s := &MapMarkSet{
|
|
set: make(map[string]struct{}),
|
|
path: path,
|
|
}
|
|
|
|
in, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("error opening markset file for read: %w", err)
|
|
}
|
|
defer in.Close()
|
|
|
|
// wrap a buffered reader to make this faster
|
|
buf := bufio.NewReader(in)
|
|
for {
|
|
var sz byte
|
|
if sz, err = buf.ReadByte(); err != nil {
|
|
break
|
|
}
|
|
|
|
key := make([]byte, int(sz))
|
|
if _, err = buf.Read(key); err != nil {
|
|
break
|
|
}
|
|
|
|
s.set[string(key)] = struct{}{}
|
|
}
|
|
|
|
if err != io.EOF {
|
|
return nil, xerrors.Errorf("error reading markset file: %w", err)
|
|
}
|
|
|
|
file, err := os.OpenFile(s.path, os.O_WRONLY|os.O_APPEND, 0)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("error opening markset file for write: %w", err)
|
|
}
|
|
|
|
s.persist = true
|
|
s.file = file
|
|
s.buf = bufio.NewWriter(file)
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (e *MapMarkSetEnv) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func (s *MapMarkSet) BeginCriticalSection() error {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if s.set == nil {
|
|
return errMarkSetClosed
|
|
}
|
|
|
|
if s.persist {
|
|
return nil
|
|
}
|
|
|
|
file, err := os.OpenFile(s.path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
return xerrors.Errorf("error opening markset file: %w", err)
|
|
}
|
|
|
|
// wrap a buffered writer to make this faster
|
|
s.buf = bufio.NewWriter(file)
|
|
for key := range s.set {
|
|
if err := s.writeKey([]byte(key), false); err != nil {
|
|
_ = file.Close()
|
|
s.buf = nil
|
|
return err
|
|
}
|
|
}
|
|
if err := s.buf.Flush(); err != nil {
|
|
_ = file.Close()
|
|
s.buf = nil
|
|
return xerrors.Errorf("error flushing markset file buffer: %w", err)
|
|
}
|
|
|
|
s.file = file
|
|
s.persist = true
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *MapMarkSet) EndCriticalSection() {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if !s.persist {
|
|
return
|
|
}
|
|
|
|
_ = s.file.Close()
|
|
_ = os.Remove(s.path)
|
|
s.file = nil
|
|
s.buf = nil
|
|
s.persist = false
|
|
}
|
|
|
|
func (s *MapMarkSet) Mark(c cid.Cid) error {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if s.set == nil {
|
|
return errMarkSetClosed
|
|
}
|
|
|
|
hash := c.Hash()
|
|
s.set[string(hash)] = struct{}{}
|
|
|
|
if s.persist {
|
|
return s.writeKey(hash, true)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *MapMarkSet) MarkMany(batch []cid.Cid) error {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if s.set == nil {
|
|
return errMarkSetClosed
|
|
}
|
|
|
|
for _, c := range batch {
|
|
hash := c.Hash()
|
|
s.set[string(hash)] = struct{}{}
|
|
|
|
if s.persist {
|
|
if err := s.writeKey(hash, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if s.persist {
|
|
return s.buf.Flush()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
|
|
s.mx.RLock()
|
|
defer s.mx.RUnlock()
|
|
|
|
if s.set == nil {
|
|
return false, errMarkSetClosed
|
|
}
|
|
|
|
_, ok := s.set[string(cid.Hash())]
|
|
return ok, nil
|
|
}
|
|
|
|
func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if s.set == nil {
|
|
return false, errMarkSetClosed
|
|
}
|
|
|
|
hash := c.Hash()
|
|
key := string(hash)
|
|
if _, ok := s.set[key]; ok {
|
|
return false, nil
|
|
}
|
|
|
|
s.set[key] = struct{}{}
|
|
|
|
if s.persist {
|
|
if err := s.writeKey(hash, true); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (s *MapMarkSet) Close() error {
|
|
s.mx.Lock()
|
|
defer s.mx.Unlock()
|
|
|
|
if s.set == nil {
|
|
return nil
|
|
}
|
|
|
|
s.set = nil
|
|
|
|
if s.file != nil {
|
|
if err := s.file.Close(); err != nil {
|
|
log.Warnf("error closing markset file: %s", err)
|
|
}
|
|
|
|
if !s.persist {
|
|
if err := os.Remove(s.path); err != nil {
|
|
log.Warnf("error removing markset file: %s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *MapMarkSet) writeKey(k []byte, flush bool) error {
|
|
if err := s.buf.WriteByte(byte(len(k))); err != nil {
|
|
return xerrors.Errorf("error writing markset key length to disk: %w", err)
|
|
}
|
|
if _, err := s.buf.Write(k); err != nil {
|
|
return xerrors.Errorf("error writing markset key to disk: %w", err)
|
|
}
|
|
if flush {
|
|
if err := s.buf.Flush(); err != nil {
|
|
return xerrors.Errorf("error flushing markset buffer to disk: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|