243 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			243 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package backupds
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"crypto/sha256"
 | 
						|
	"io"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/ipfs/go-datastore"
 | 
						|
	"github.com/ipfs/go-datastore/query"
 | 
						|
	logging "github.com/ipfs/go-log/v2"
 | 
						|
	cbg "github.com/whyrusleeping/cbor-gen"
 | 
						|
	"go.uber.org/multierr"
 | 
						|
	"golang.org/x/xerrors"
 | 
						|
)
 | 
						|
 | 
						|
var log = logging.Logger("backupds")
 | 
						|
 | 
						|
const NoLogdir = ""
 | 
						|
 | 
						|
type Datastore struct {
 | 
						|
	child datastore.Batching
 | 
						|
 | 
						|
	backupLk sync.RWMutex
 | 
						|
 | 
						|
	log             chan Entry
 | 
						|
	closing, closed chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
type Entry struct {
 | 
						|
	Key, Value []byte
 | 
						|
	Timestamp  int64
 | 
						|
}
 | 
						|
 | 
						|
func Wrap(child datastore.Batching, logdir string) (*Datastore, error) {
 | 
						|
	ds := &Datastore{
 | 
						|
		child: child,
 | 
						|
	}
 | 
						|
 | 
						|
	if logdir != NoLogdir {
 | 
						|
		ds.closing, ds.closed = make(chan struct{}), make(chan struct{})
 | 
						|
		ds.log = make(chan Entry)
 | 
						|
 | 
						|
		if err := ds.startLog(logdir); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return ds, nil
 | 
						|
}
 | 
						|
 | 
						|
// Writes a datastore dump into the provided writer as
 | 
						|
// [array(*) of [key, value] tuples, checksum]
 | 
						|
func (d *Datastore) Backup(ctx context.Context, out io.Writer) error {
 | 
						|
	scratch := make([]byte, 9)
 | 
						|
 | 
						|
	if err := cbg.WriteMajorTypeHeaderBuf(scratch, out, cbg.MajArray, 2); err != nil {
 | 
						|
		return xerrors.Errorf("writing tuple header: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	hasher := sha256.New()
 | 
						|
	hout := io.MultiWriter(hasher, out)
 | 
						|
 | 
						|
	// write KVs
 | 
						|
	{
 | 
						|
		// write indefinite length array header
 | 
						|
		if _, err := hout.Write([]byte{0x9f}); err != nil {
 | 
						|
			return xerrors.Errorf("writing header: %w", err)
 | 
						|
		}
 | 
						|
 | 
						|
		d.backupLk.Lock()
 | 
						|
		defer d.backupLk.Unlock()
 | 
						|
 | 
						|
		log.Info("Starting datastore backup")
 | 
						|
		defer log.Info("Datastore backup done")
 | 
						|
 | 
						|
		qr, err := d.child.Query(ctx, query.Query{})
 | 
						|
		if err != nil {
 | 
						|
			return xerrors.Errorf("query: %w", err)
 | 
						|
		}
 | 
						|
		defer func() {
 | 
						|
			if err := qr.Close(); err != nil {
 | 
						|
				log.Errorf("query close error: %+v", err)
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}()
 | 
						|
 | 
						|
		for result := range qr.Next() {
 | 
						|
			if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajArray, 2); err != nil {
 | 
						|
				return xerrors.Errorf("writing tuple header: %w", err)
 | 
						|
			}
 | 
						|
 | 
						|
			if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajByteString, uint64(len([]byte(result.Key)))); err != nil {
 | 
						|
				return xerrors.Errorf("writing key header: %w", err)
 | 
						|
			}
 | 
						|
 | 
						|
			if _, err := hout.Write([]byte(result.Key)[:]); err != nil {
 | 
						|
				return xerrors.Errorf("writing key: %w", err)
 | 
						|
			}
 | 
						|
 | 
						|
			if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajByteString, uint64(len(result.Value))); err != nil {
 | 
						|
				return xerrors.Errorf("writing value header: %w", err)
 | 
						|
			}
 | 
						|
 | 
						|
			if _, err := hout.Write(result.Value[:]); err != nil {
 | 
						|
				return xerrors.Errorf("writing value: %w", err)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// array break
 | 
						|
		if _, err := hout.Write([]byte{0xff}); err != nil {
 | 
						|
			return xerrors.Errorf("writing array 'break': %w", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Write the checksum
 | 
						|
	{
 | 
						|
		sum := hasher.Sum(nil)
 | 
						|
 | 
						|
		if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajByteString, uint64(len(sum))); err != nil {
 | 
						|
			return xerrors.Errorf("writing checksum header: %w", err)
 | 
						|
		}
 | 
						|
 | 
						|
		if _, err := hout.Write(sum[:]); err != nil {
 | 
						|
			return xerrors.Errorf("writing checksum: %w", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// proxy
 | 
						|
 | 
						|
func (d *Datastore) Get(ctx context.Context, key datastore.Key) (value []byte, err error) {
 | 
						|
	return d.child.Get(ctx, key)
 | 
						|
}
 | 
						|
 | 
						|
func (d *Datastore) Has(ctx context.Context, key datastore.Key) (exists bool, err error) {
 | 
						|
	return d.child.Has(ctx, key)
 | 
						|
}
 | 
						|
 | 
						|
func (d *Datastore) GetSize(ctx context.Context, key datastore.Key) (size int, err error) {
 | 
						|
	return d.child.GetSize(ctx, key)
 | 
						|
}
 | 
						|
 | 
						|
func (d *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error) {
 | 
						|
	return d.child.Query(ctx, q)
 | 
						|
}
 | 
						|
 | 
						|
func (d *Datastore) Put(ctx context.Context, key datastore.Key, value []byte) error {
 | 
						|
	d.backupLk.RLock()
 | 
						|
	defer d.backupLk.RUnlock()
 | 
						|
 | 
						|
	if d.log != nil {
 | 
						|
		d.log <- Entry{
 | 
						|
			Key:       []byte(key.String()),
 | 
						|
			Value:     value,
 | 
						|
			Timestamp: time.Now().Unix(),
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return d.child.Put(ctx, key, value)
 | 
						|
}
 | 
						|
 | 
						|
func (d *Datastore) Delete(ctx context.Context, key datastore.Key) error {
 | 
						|
	d.backupLk.RLock()
 | 
						|
	defer d.backupLk.RUnlock()
 | 
						|
 | 
						|
	return d.child.Delete(ctx, key)
 | 
						|
}
 | 
						|
 | 
						|
func (d *Datastore) Sync(ctx context.Context, prefix datastore.Key) error {
 | 
						|
	d.backupLk.RLock()
 | 
						|
	defer d.backupLk.RUnlock()
 | 
						|
 | 
						|
	return d.child.Sync(ctx, prefix)
 | 
						|
}
 | 
						|
 | 
						|
func (d *Datastore) CloseLog() error {
 | 
						|
	d.backupLk.RLock()
 | 
						|
	defer d.backupLk.RUnlock()
 | 
						|
 | 
						|
	if d.closing != nil {
 | 
						|
		close(d.closing)
 | 
						|
		<-d.closed
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (d *Datastore) Close() error {
 | 
						|
	return multierr.Combine(
 | 
						|
		d.child.Close(),
 | 
						|
		d.CloseLog(),
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func (d *Datastore) Batch(ctx context.Context) (datastore.Batch, error) {
 | 
						|
	b, err := d.child.Batch(ctx)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &bbatch{
 | 
						|
		d:   d,
 | 
						|
		b:   b,
 | 
						|
		rlk: d.backupLk.RLocker(),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
type bbatch struct {
 | 
						|
	d   *Datastore
 | 
						|
	b   datastore.Batch
 | 
						|
	rlk sync.Locker
 | 
						|
}
 | 
						|
 | 
						|
func (b *bbatch) Put(ctx context.Context, key datastore.Key, value []byte) error {
 | 
						|
	if b.d.log != nil {
 | 
						|
		b.d.log <- Entry{
 | 
						|
			Key:       []byte(key.String()),
 | 
						|
			Value:     value,
 | 
						|
			Timestamp: time.Now().Unix(),
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return b.b.Put(ctx, key, value)
 | 
						|
}
 | 
						|
 | 
						|
func (b *bbatch) Delete(ctx context.Context, key datastore.Key) error {
 | 
						|
	return b.b.Delete(ctx, key)
 | 
						|
}
 | 
						|
 | 
						|
func (b *bbatch) Commit(ctx context.Context) error {
 | 
						|
	b.rlk.Lock()
 | 
						|
	defer b.rlk.Unlock()
 | 
						|
 | 
						|
	return b.b.Commit(ctx)
 | 
						|
}
 | 
						|
 | 
						|
var _ datastore.Batch = &bbatch{}
 | 
						|
var _ datastore.Batching = &Datastore{}
 |