lotus/lib/backupds/datastore.go

243 lines
4.9 KiB
Go
Raw Normal View History

2020-10-01 11:58:26 +00:00
package backupds
import (
2020-10-05 23:50:43 +00:00
"crypto/sha256"
2020-10-01 11:58:26 +00:00
"io"
"sync"
2021-03-09 21:33:01 +00:00
"time"
2020-10-01 11:58:26 +00:00
2021-03-10 12:58:02 +00:00
"go.uber.org/multierr"
2020-10-01 11:58:26 +00:00
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
2021-03-09 21:33:01 +00:00
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
2020-10-01 11:58:26 +00:00
)
var log = logging.Logger("backupds")
2021-03-09 21:33:01 +00:00
const NoLogdir = ""
2020-10-01 11:58:26 +00:00
type Datastore struct {
child datastore.Batching
backupLk sync.RWMutex
2021-03-09 21:33:01 +00:00
2021-03-09 21:33:12 +00:00
log chan Entry
2021-03-09 21:33:01 +00:00
closing, closed chan struct{}
}
type Entry struct {
Key, Value []byte
2021-03-09 21:33:12 +00:00
Timestamp int64
2020-10-01 11:58:26 +00:00
}
2021-03-09 21:33:01 +00:00
func Wrap(child datastore.Batching, logdir string) (*Datastore, error) {
ds := &Datastore{
2021-03-09 21:33:12 +00:00
child: child,
2020-10-01 11:58:26 +00:00
}
2021-03-09 21:33:01 +00:00
if logdir != NoLogdir {
ds.closing, ds.closed = make(chan struct{}), make(chan struct{})
ds.log = make(chan Entry)
if err := ds.startLog(logdir); err != nil {
return nil, err
}
}
return ds, nil
2020-10-01 11:58:26 +00:00
}
2020-10-05 23:50:43 +00:00
// Writes a datastore dump into the provided writer as
// [array(*) of [key, value] tuples, checksum]
2020-10-01 11:58:26 +00:00
func (d *Datastore) Backup(out io.Writer) error {
2020-10-05 23:50:43 +00:00
scratch := make([]byte, 9)
2020-10-01 11:58:26 +00:00
2020-10-05 23:50:43 +00:00
if err := cbg.WriteMajorTypeHeaderBuf(scratch, out, cbg.MajArray, 2); err != nil {
return xerrors.Errorf("writing tuple header: %w", err)
}
2020-10-01 11:58:26 +00:00
2020-10-05 23:50:43 +00:00
hasher := sha256.New()
hout := io.MultiWriter(hasher, out)
2020-10-01 11:58:26 +00:00
2020-10-05 23:50:43 +00:00
// write KVs
{
// write indefinite length array header
if _, err := hout.Write([]byte{0x9f}); err != nil {
return xerrors.Errorf("writing header: %w", err)
2020-10-01 11:58:26 +00:00
}
2020-10-05 23:50:43 +00:00
d.backupLk.Lock()
defer d.backupLk.Unlock()
2020-10-01 11:58:26 +00:00
2020-10-05 23:50:43 +00:00
log.Info("Starting datastore backup")
defer log.Info("Datastore backup done")
2020-10-01 11:58:26 +00:00
2020-10-05 23:50:43 +00:00
qr, err := d.child.Query(query.Query{})
if err != nil {
return xerrors.Errorf("query: %w", err)
2020-10-01 11:58:26 +00:00
}
2020-10-05 23:50:43 +00:00
defer func() {
if err := qr.Close(); err != nil {
log.Errorf("query close error: %+v", err)
return
}
}()
for result := range qr.Next() {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajArray, 2); err != nil {
return xerrors.Errorf("writing tuple header: %w", err)
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajByteString, uint64(len([]byte(result.Key)))); err != nil {
return xerrors.Errorf("writing key header: %w", err)
}
if _, err := hout.Write([]byte(result.Key)[:]); err != nil {
return xerrors.Errorf("writing key: %w", err)
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajByteString, uint64(len(result.Value))); err != nil {
return xerrors.Errorf("writing value header: %w", err)
}
if _, err := hout.Write(result.Value[:]); err != nil {
return xerrors.Errorf("writing value: %w", err)
}
2020-10-01 11:58:26 +00:00
}
2020-10-05 23:50:43 +00:00
// array break
if _, err := hout.Write([]byte{0xff}); err != nil {
return xerrors.Errorf("writing array 'break': %w", err)
2020-10-01 11:58:26 +00:00
}
2020-10-05 23:50:43 +00:00
}
// Write the checksum
{
sum := hasher.Sum(nil)
2020-10-01 11:58:26 +00:00
2020-10-05 23:50:43 +00:00
if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajByteString, uint64(len(sum))); err != nil {
return xerrors.Errorf("writing checksum header: %w", err)
2020-10-01 11:58:26 +00:00
}
2020-10-05 23:50:43 +00:00
if _, err := hout.Write(sum[:]); err != nil {
return xerrors.Errorf("writing checksum: %w", err)
}
2020-10-01 11:58:26 +00:00
}
return nil
}
// proxy
func (d *Datastore) Get(key datastore.Key) (value []byte, err error) {
return d.child.Get(key)
}
func (d *Datastore) Has(key datastore.Key) (exists bool, err error) {
return d.child.Has(key)
}
func (d *Datastore) GetSize(key datastore.Key) (size int, err error) {
return d.child.GetSize(key)
}
func (d *Datastore) Query(q query.Query) (query.Results, error) {
return d.child.Query(q)
}
func (d *Datastore) Put(key datastore.Key, value []byte) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
2021-03-09 21:33:01 +00:00
if d.log != nil {
d.log <- Entry{
Key: []byte(key.String()),
Value: value,
Timestamp: time.Now().Unix(),
}
}
2020-10-01 11:58:26 +00:00
return d.child.Put(key, value)
}
func (d *Datastore) Delete(key datastore.Key) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
return d.child.Delete(key)
}
func (d *Datastore) Sync(prefix datastore.Key) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
return d.child.Sync(prefix)
}
2021-03-09 21:33:01 +00:00
func (d *Datastore) CloseLog() error {
2020-10-01 11:58:26 +00:00
d.backupLk.RLock()
defer d.backupLk.RUnlock()
2021-03-09 21:33:01 +00:00
if d.closing != nil {
close(d.closing)
<-d.closed
}
return nil
}
func (d *Datastore) Close() error {
2021-03-10 12:58:02 +00:00
return multierr.Combine(
d.child.Close(),
d.CloseLog(),
)
2020-10-01 11:58:26 +00:00
}
func (d *Datastore) Batch() (datastore.Batch, error) {
b, err := d.child.Batch()
if err != nil {
return nil, err
}
return &bbatch{
2021-03-09 21:33:01 +00:00
d: d,
2020-10-01 11:58:26 +00:00
b: b,
rlk: d.backupLk.RLocker(),
}, nil
}
type bbatch struct {
2021-03-09 21:33:01 +00:00
d *Datastore
2020-10-01 11:58:26 +00:00
b datastore.Batch
rlk sync.Locker
}
func (b *bbatch) Put(key datastore.Key, value []byte) error {
2021-03-09 21:33:01 +00:00
if b.d.log != nil {
b.d.log <- Entry{
Key: []byte(key.String()),
Value: value,
Timestamp: time.Now().Unix(),
}
}
2020-10-01 11:58:26 +00:00
return b.b.Put(key, value)
}
func (b *bbatch) Delete(key datastore.Key) error {
return b.b.Delete(key)
}
func (b *bbatch) Commit() error {
b.rlk.Lock()
defer b.rlk.Unlock()
return b.b.Commit()
}
var _ datastore.Batch = &bbatch{}
var _ datastore.Batching = &Datastore{}