Metadata datastore log

This commit is contained in:
Łukasz Magiera 2021-03-09 22:33:01 +01:00
parent 1c62d7a40f
commit ca7e70bf3a
9 changed files with 478 additions and 18 deletions

View File

@ -51,7 +51,10 @@ func BackupCmd(repoFlag string, rt repo.RepoType, getApi BackupApiFn) *cli.Comma
return xerrors.Errorf("getting metadata datastore: %w", err)
}
bds := backupds.Wrap(mds)
bds, err := backupds.Wrap(mds, backupds.NoLogdir)
if err != nil {
return err
}
fpath, err := homedir.Expand(cctx.Args().First())
if err != nil {

View File

@ -4,15 +4,15 @@ import (
"fmt"
"os"
"github.com/filecoin-project/lotus/chain/market"
gen "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/paychmgr"
)
@ -105,4 +105,12 @@ func main() {
fmt.Println(err)
os.Exit(1)
}
err = gen.WriteTupleEncodersToFile("./lib/backupds/cbor_gen.go", "backupds",
backupds.Entry{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}

157
lib/backupds/cbor_gen.go Normal file
View File

@ -0,0 +1,157 @@
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
package backupds
import (
"fmt"
"io"
"sort"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
var _ = xerrors.Errorf
var _ = cid.Undef
var _ = sort.Sort
var lengthBufEntry = []byte{131}
func (t *Entry) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write(lengthBufEntry); err != nil {
return err
}
scratch := make([]byte, 9)
// t.Key ([]uint8) (slice)
if len(t.Key) > cbg.ByteArrayMaxLen {
return xerrors.Errorf("Byte array in field t.Key was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajByteString, uint64(len(t.Key))); err != nil {
return err
}
if _, err := w.Write(t.Key[:]); err != nil {
return err
}
// t.Value ([]uint8) (slice)
if len(t.Value) > cbg.ByteArrayMaxLen {
return xerrors.Errorf("Byte array in field t.Value was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajByteString, uint64(len(t.Value))); err != nil {
return err
}
if _, err := w.Write(t.Value[:]); err != nil {
return err
}
// t.Timestamp (int64) (int64)
if t.Timestamp >= 0 {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.Timestamp)); err != nil {
return err
}
} else {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.Timestamp-1)); err != nil {
return err
}
}
return nil
}
func (t *Entry) UnmarshalCBOR(r io.Reader) error {
*t = Entry{}
br := cbg.GetPeeker(r)
scratch := make([]byte, 8)
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.Key ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.Key: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
if extra > 0 {
t.Key = make([]uint8, extra)
}
if _, err := io.ReadFull(br, t.Key[:]); err != nil {
return err
}
// t.Value ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.Value: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
if extra > 0 {
t.Value = make([]uint8, extra)
}
if _, err := io.ReadFull(br, t.Value[:]); err != nil {
return err
}
// t.Timestamp (int64) (int64)
{
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
var extraI int64
if err != nil {
return err
}
switch maj {
case cbg.MajUnsignedInt:
extraI = int64(extra)
if extraI < 0 {
return fmt.Errorf("int64 positive overflow")
}
case cbg.MajNegativeInt:
extraI = int64(extra)
if extraI < 0 {
return fmt.Errorf("int64 negative oveflow")
}
extraI = -1 - extraI
default:
return fmt.Errorf("wrong type for int64 field: %d", maj)
}
t.Timestamp = int64(extraI)
}
return nil
}

View File

@ -4,27 +4,49 @@ import (
"crypto/sha256"
"io"
"sync"
"time"
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
)
var log = logging.Logger("backupds")
const NoLogdir = ""
type Datastore struct {
child datastore.Batching
backupLk sync.RWMutex
log chan Entry
closing, closed chan struct{}
}
func Wrap(child datastore.Batching) *Datastore {
return &Datastore{
child: child,
type Entry struct {
Key, Value []byte
Timestamp int64
}
func Wrap(child datastore.Batching, logdir string) (*Datastore, error) {
ds := &Datastore{
child: child,
}
if logdir != NoLogdir {
ds.closing, ds.closed = make(chan struct{}), make(chan struct{})
ds.log = make(chan Entry)
if err := ds.startLog(logdir); err != nil {
return nil, err
}
}
return ds, nil
}
// Writes a datastore dump into the provided writer as
@ -129,6 +151,14 @@ func (d *Datastore) Put(key datastore.Key, value []byte) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
if d.log != nil {
d.log <- Entry{
Key: []byte(key.String()),
Value: value,
Timestamp: time.Now().Unix(),
}
}
return d.child.Put(key, value)
}
@ -146,11 +176,24 @@ func (d *Datastore) Sync(prefix datastore.Key) error {
return d.child.Sync(prefix)
}
func (d *Datastore) Close() error {
func (d *Datastore) CloseLog() error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
return d.child.Close()
if d.closing != nil {
close(d.closing)
<-d.closed
}
return nil
}
func (d *Datastore) Close() error {
if err := d.child.Close(); err != nil {
return xerrors.Errorf("closing child datastore: %w", err)
}
return d.CloseLog()
}
func (d *Datastore) Batch() (datastore.Batch, error) {
@ -160,17 +203,28 @@ func (d *Datastore) Batch() (datastore.Batch, error) {
}
return &bbatch{
d: d,
b: b,
rlk: d.backupLk.RLocker(),
}, nil
}
type bbatch struct {
d *Datastore
b datastore.Batch
rlk sync.Locker
}
func (b *bbatch) Put(key datastore.Key, value []byte) error {
if b.d.log != nil {
b.d.log <- Entry{
Key: []byte(key.String()),
Value: value,
Timestamp: time.Now().Unix(),
}
}
return b.b.Put(key, value)
}

170
lib/backupds/log.go Normal file
View File

@ -0,0 +1,170 @@
package backupds
import (
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
)
func (d *Datastore) startLog(logdir string) error {
if err := os.MkdirAll(logdir, 0755); err != nil && !os.IsExist(err) {
return xerrors.Errorf("mkdir logdir ('%s'): %w", logdir, err)
}
files, err := ioutil.ReadDir(logdir)
if err != nil {
return xerrors.Errorf("read logdir ('%s'): %w", logdir, err)
}
var latest string
var latestTs int64
for _, file := range files {
fn := file.Name()
if !strings.HasSuffix(fn, ".log.cbor") {
log.Warn("logfile with wrong file extension", fn)
continue
}
sec, err := strconv.ParseInt(fn[:len(".log.cbor")], 10, 64)
if err != nil {
return xerrors.Errorf("parsing logfile as a number: %w", err)
}
if sec > latestTs {
latestTs = sec
latest = file.Name()
}
}
var l *logfile
if latest == "" {
l, err = d.createLog(logdir)
if err != nil {
return xerrors.Errorf("creating log: %w", err)
}
} else {
l, err = openLog(filepath.Join(logdir, latest))
if err != nil {
return xerrors.Errorf("opening log: %w", err)
}
}
go func() {
defer close(d.closed)
for {
select {
case ent := <-d.log:
if err := l.writeEntry(&ent); err != nil {
log.Errorw("failed to write log entry", "error", err)
// todo try to do something, maybe start a new log file (but not when we're out of disk space)
}
// todo: batch writes when multiple are pending; flush on a timer
if err := l.file.Sync(); err != nil {
log.Errorw("failed to sync log", "error", err)
}
case <-d.closing:
if err := l.Close(); err != nil {
log.Errorw("failed to close log", "error", err)
}
}
}
}()
return nil
}
type logfile struct {
file *os.File
}
func (d *Datastore) createLog(logdir string) (*logfile, error) {
p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor")
log.Infow("creating log", "file", p)
f, err := os.OpenFile(p, os.O_RDWR | os.O_CREATE | os.O_EXCL, 0644)
if err != nil {
return nil, err
}
if err := d.Backup(f); err != nil {
return nil, xerrors.Errorf("writing log base: %w", err)
}
if err := f.Sync(); err != nil {
return nil, xerrors.Errorf("sync log base: %w", err)
}
log.Infow("log opened", "file", p)
// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption
return &logfile{
file: f,
}, nil
}
func openLog(p string) (*logfile, error) {
log.Infow("opening log", "file", p)
f, err := os.OpenFile(p, os.O_RDWR, 0644)
if err != nil {
return nil, err
}
// check file integrity
err = ReadBackup(f, func(_ datastore.Key, _ []byte) error {
return nil
})
if err != nil {
return nil, xerrors.Errorf("reading backup part of the logfile: %w", err)
}
log.Infow("log opened", "file", p)
// make sure we're at the end of the file
at, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return nil, xerrors.Errorf("get current logfile offset: %w", err)
}
end, err := f.Seek(0, io.SeekEnd)
if err != nil {
return nil, xerrors.Errorf("get current logfile offset: %w", err)
}
if at != end {
return nil, xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at)
}
// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption
return &logfile{
file: f,
}, nil
}
func (l *logfile) writeEntry(e *Entry) error {
// todo: maybe marshal to some temp buffer, then put into the file?
if err := e.MarshalCBOR(l.file); err != nil {
return xerrors.Errorf("writing log entry: %w", err)
}
return nil
}
func (l *logfile) Close() error {
// todo: maybe write a magic 'close at' entry; pad the log to filesystem page to prevent more exotic types of corruption
if err := l.file.Close(); err != nil {
return err
}
l.file = nil
return nil
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"crypto/sha256"
"io"
"os"
"github.com/ipfs/go-datastore"
cbg "github.com/whyrusleeping/cbor-gen"
@ -13,6 +14,7 @@ import (
func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) error {
scratch := make([]byte, 9)
// read array[2](
if _, err := r.Read(scratch[:1]); err != nil {
return xerrors.Errorf("reading array header: %w", err)
}
@ -24,6 +26,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
hasher := sha256.New()
hr := io.TeeReader(r, hasher)
// read array[*](
if _, err := hr.Read(scratch[:1]); err != nil {
return xerrors.Errorf("reading array header: %w", err)
}
@ -37,10 +40,12 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
return xerrors.Errorf("reading tuple header: %w", err)
}
// close array[*]
if scratch[0] == 0xff {
break
}
// read array[2](key:[]byte, value:[]byte)
if scratch[0] != 0x82 {
return xerrors.Errorf("expected array(2) header 0x82, got %x", scratch[0])
}
@ -63,6 +68,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
sum := hasher.Sum(nil)
// read the [32]byte checksum
expSum, err := cbg.ReadByteArray(r, 32)
if err != nil {
return xerrors.Errorf("reading expected checksum: %w", err)
@ -72,6 +78,43 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
return xerrors.Errorf("checksum didn't match; expected %x, got %x", expSum, sum)
}
// read the log, set of Entry-ies
var ent Entry
bp := cbg.GetPeeker(r)
for {
_, err := bp.ReadByte()
switch err {
case io.EOF, io.ErrUnexpectedEOF:
return nil
case nil:
default:
return xerrors.Errorf("peek log: %w", err)
}
if err := bp.UnreadByte(); err != nil {
return xerrors.Errorf("unread log byte: %w", err)
}
if err := ent.UnmarshalCBOR(bp); err != nil {
switch err {
case io.EOF, io.ErrUnexpectedEOF:
if os.Getenv("LOTUS_ALLOW_TRUNCATED_LOG") == "1" {
panic("handleme; just ignore and tell the caller about the corrupted file") // todo
} else {
return xerrors.Errorf("log entry potentially truncated, set LOTUS_ALLOW_TRUNCATED_LOG=1 to proceed: %w", err)
}
default:
return xerrors.Errorf("unmarshaling log entry: %w", err)
}
}
key := datastore.NewKey(string(ent.Key))
if err := cb(key, ent.Value); err != nil {
return err
}
}
return nil
}

View File

@ -508,6 +508,7 @@ func ConfigCommon(cfg *config.Common) Option {
Override(AddrsFactoryKey, lp2p.AddrsFactory(
cfg.Libp2p.AnnounceAddresses,
cfg.Libp2p.NoAnnounceAddresses)),
Override(new(dtypes.MetadataDS), modules.Datastore(cfg.Backup.DisableMetadataLog)),
)
}
@ -601,7 +602,6 @@ func Repo(r repo.Repo) Option {
return Options(
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
If(cfg.EnableSplitstore,

View File

@ -13,6 +13,7 @@ import (
// Common is common config between full node and miner
type Common struct {
API API
Backup Backup
Libp2p Libp2p
Pubsub Pubsub
}
@ -29,6 +30,10 @@ type FullNode struct {
// // Common
type Backup struct {
DisableMetadataLog bool
}
// StorageMiner is a miner config
type StorageMiner struct {
Common

View File

@ -2,6 +2,8 @@ package modules
import (
"context"
"golang.org/x/xerrors"
"path/filepath"
"go.uber.org/fx"
@ -28,12 +30,30 @@ func KeyStore(lr repo.LockedRepo) (types.KeyStore, error) {
return lr.KeyStore()
}
func Datastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
mds, err := r.Datastore(ctx, "/metadata")
if err != nil {
return nil, err
}
func Datastore(disableLog bool) func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) {
return func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
mds, err := r.Datastore(ctx, "/metadata")
if err != nil {
return nil, err
}
return backupds.Wrap(mds), nil
var logdir string
if !disableLog {
logdir = filepath.Join(r.Path(), "kvlog/metadata")
}
bds, err := backupds.Wrap(mds, logdir)
if err != nil {
return nil, xerrors.Errorf("opening backupds: %w", err)
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return bds.CloseLog()
},
})
return bds, nil
}
}