From ca7e70bf3ad55449fd5a269312bddeb0fcc844f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 9 Mar 2021 22:33:01 +0100 Subject: [PATCH] Metadata datastore log --- cli/backup.go | 5 +- gen/main.go | 12 ++- lib/backupds/cbor_gen.go | 157 +++++++++++++++++++++++++++++++++++ lib/backupds/datastore.go | 68 +++++++++++++-- lib/backupds/log.go | 170 ++++++++++++++++++++++++++++++++++++++ lib/backupds/read.go | 43 ++++++++++ node/builder.go | 2 +- node/config/def.go | 5 ++ node/modules/storage.go | 34 ++++++-- 9 files changed, 478 insertions(+), 18 deletions(-) create mode 100644 lib/backupds/cbor_gen.go create mode 100644 lib/backupds/log.go diff --git a/cli/backup.go b/cli/backup.go index 1ee415727..856e098dd 100644 --- a/cli/backup.go +++ b/cli/backup.go @@ -51,7 +51,10 @@ func BackupCmd(repoFlag string, rt repo.RepoType, getApi BackupApiFn) *cli.Comma return xerrors.Errorf("getting metadata datastore: %w", err) } - bds := backupds.Wrap(mds) + bds, err := backupds.Wrap(mds, backupds.NoLogdir) + if err != nil { + return err + } fpath, err := homedir.Expand(cctx.Args().First()) if err != nil { diff --git a/gen/main.go b/gen/main.go index 9009172b9..61f41beec 100644 --- a/gen/main.go +++ b/gen/main.go @@ -4,15 +4,15 @@ import ( "fmt" "os" - "github.com/filecoin-project/lotus/chain/market" - gen "github.com/whyrusleeping/cbor-gen" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/exchange" + "github.com/filecoin-project/lotus/chain/market" "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + "github.com/filecoin-project/lotus/lib/backupds" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/paychmgr" ) @@ -105,4 +105,12 @@ func main() { fmt.Println(err) os.Exit(1) } + + err = gen.WriteTupleEncodersToFile("./lib/backupds/cbor_gen.go", "backupds", + backupds.Entry{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } } diff --git a/lib/backupds/cbor_gen.go b/lib/backupds/cbor_gen.go new file mode 100644 index 000000000..d6cb6f4d3 --- /dev/null +++ b/lib/backupds/cbor_gen.go @@ -0,0 +1,157 @@ +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +package backupds + +import ( + "fmt" + "io" + "sort" + + cid "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +var _ = xerrors.Errorf +var _ = cid.Undef +var _ = sort.Sort + +var lengthBufEntry = []byte{131} + +func (t *Entry) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write(lengthBufEntry); err != nil { + return err + } + + scratch := make([]byte, 9) + + // t.Key ([]uint8) (slice) + if len(t.Key) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.Key was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajByteString, uint64(len(t.Key))); err != nil { + return err + } + + if _, err := w.Write(t.Key[:]); err != nil { + return err + } + + // t.Value ([]uint8) (slice) + if len(t.Value) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.Value was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajByteString, uint64(len(t.Value))); err != nil { + return err + } + + if _, err := w.Write(t.Value[:]); err != nil { + return err + } + + // t.Timestamp (int64) (int64) + if t.Timestamp >= 0 { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.Timestamp)); err != nil { + return err + } + } else { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.Timestamp-1)); err != nil { + return err + } + } + return nil +} + +func (t *Entry) UnmarshalCBOR(r io.Reader) error { + *t = Entry{} + + br := cbg.GetPeeker(r) + scratch := make([]byte, 8) + + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Key ([]uint8) (slice) + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.Key: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + + if extra > 0 { + t.Key = make([]uint8, extra) + } + + if _, err := io.ReadFull(br, t.Key[:]); err != nil { + return err + } + // t.Value ([]uint8) (slice) + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.Value: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + + if extra > 0 { + t.Value = make([]uint8, extra) + } + + if _, err := io.ReadFull(br, t.Value[:]); err != nil { + return err + } + // t.Timestamp (int64) (int64) + { + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) + } + + t.Timestamp = int64(extraI) + } + return nil +} diff --git a/lib/backupds/datastore.go b/lib/backupds/datastore.go index 1555577f3..e349d9207 100644 --- a/lib/backupds/datastore.go +++ b/lib/backupds/datastore.go @@ -4,27 +4,49 @@ import ( "crypto/sha256" "io" "sync" + "time" - logging "github.com/ipfs/go-log/v2" - cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" + logging "github.com/ipfs/go-log/v2" + cbg "github.com/whyrusleeping/cbor-gen" ) var log = logging.Logger("backupds") +const NoLogdir = "" + type Datastore struct { child datastore.Batching backupLk sync.RWMutex + + log chan Entry + closing, closed chan struct{} } -func Wrap(child datastore.Batching) *Datastore { - return &Datastore{ - child: child, +type Entry struct { + Key, Value []byte + Timestamp int64 +} + +func Wrap(child datastore.Batching, logdir string) (*Datastore, error) { + ds := &Datastore{ + child: child, } + + if logdir != NoLogdir { + ds.closing, ds.closed = make(chan struct{}), make(chan struct{}) + ds.log = make(chan Entry) + + if err := ds.startLog(logdir); err != nil { + return nil, err + } + } + + return ds, nil } // Writes a datastore dump into the provided writer as @@ -129,6 +151,14 @@ func (d *Datastore) Put(key datastore.Key, value []byte) error { d.backupLk.RLock() defer d.backupLk.RUnlock() + if d.log != nil { + d.log <- Entry{ + Key: []byte(key.String()), + Value: value, + Timestamp: time.Now().Unix(), + } + } + return d.child.Put(key, value) } @@ -146,11 +176,24 @@ func (d *Datastore) Sync(prefix datastore.Key) error { return d.child.Sync(prefix) } -func (d *Datastore) Close() error { +func (d *Datastore) CloseLog() error { d.backupLk.RLock() defer d.backupLk.RUnlock() - return d.child.Close() + if d.closing != nil { + close(d.closing) + <-d.closed + } + + return nil +} + +func (d *Datastore) Close() error { + if err := d.child.Close(); err != nil { + return xerrors.Errorf("closing child datastore: %w", err) + } + + return d.CloseLog() } func (d *Datastore) Batch() (datastore.Batch, error) { @@ -160,17 +203,28 @@ func (d *Datastore) Batch() (datastore.Batch, error) { } return &bbatch{ + d: d, b: b, rlk: d.backupLk.RLocker(), }, nil } type bbatch struct { + d *Datastore b datastore.Batch rlk sync.Locker } func (b *bbatch) Put(key datastore.Key, value []byte) error { + if b.d.log != nil { + b.d.log <- Entry{ + Key: []byte(key.String()), + Value: value, + Timestamp: time.Now().Unix(), + } + } + + return b.b.Put(key, value) } diff --git a/lib/backupds/log.go b/lib/backupds/log.go new file mode 100644 index 000000000..9a3e09e3b --- /dev/null +++ b/lib/backupds/log.go @@ -0,0 +1,170 @@ +package backupds + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "golang.org/x/xerrors" + + "github.com/ipfs/go-datastore" +) + +func (d *Datastore) startLog(logdir string) error { + if err := os.MkdirAll(logdir, 0755); err != nil && !os.IsExist(err) { + return xerrors.Errorf("mkdir logdir ('%s'): %w", logdir, err) + } + + files, err := ioutil.ReadDir(logdir) + if err != nil { + return xerrors.Errorf("read logdir ('%s'): %w", logdir, err) + } + + var latest string + var latestTs int64 + + for _, file := range files { + fn := file.Name() + if !strings.HasSuffix(fn, ".log.cbor") { + log.Warn("logfile with wrong file extension", fn) + continue + } + sec, err := strconv.ParseInt(fn[:len(".log.cbor")], 10, 64) + if err != nil { + return xerrors.Errorf("parsing logfile as a number: %w", err) + } + + if sec > latestTs { + latestTs = sec + latest = file.Name() + } + } + + var l *logfile + if latest == "" { + l, err = d.createLog(logdir) + if err != nil { + return xerrors.Errorf("creating log: %w", err) + } + } else { + l, err = openLog(filepath.Join(logdir, latest)) + if err != nil { + return xerrors.Errorf("opening log: %w", err) + } + } + + + go func() { + defer close(d.closed) + for { + select { + case ent := <-d.log: + if err := l.writeEntry(&ent); err != nil { + log.Errorw("failed to write log entry", "error", err) + // todo try to do something, maybe start a new log file (but not when we're out of disk space) + } + + // todo: batch writes when multiple are pending; flush on a timer + if err := l.file.Sync(); err != nil { + log.Errorw("failed to sync log", "error", err) + } + case <-d.closing: + if err := l.Close(); err != nil { + log.Errorw("failed to close log", "error", err) + } + } + } + }() + + return nil +} + +type logfile struct { + file *os.File +} + +func (d *Datastore) createLog(logdir string) (*logfile, error) { + p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor") + log.Infow("creating log", "file", p) + + f, err := os.OpenFile(p, os.O_RDWR | os.O_CREATE | os.O_EXCL, 0644) + if err != nil { + return nil, err + } + + if err := d.Backup(f); err != nil { + return nil, xerrors.Errorf("writing log base: %w", err) + } + if err := f.Sync(); err != nil { + return nil, xerrors.Errorf("sync log base: %w", err) + } + log.Infow("log opened", "file", p) + + // todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption + + return &logfile{ + file: f, + }, nil +} + +func openLog(p string) (*logfile, error) { + log.Infow("opening log", "file", p) + f, err := os.OpenFile(p, os.O_RDWR, 0644) + if err != nil { + return nil, err + } + + // check file integrity + err = ReadBackup(f, func(_ datastore.Key, _ []byte) error { + return nil + }) + if err != nil { + return nil, xerrors.Errorf("reading backup part of the logfile: %w", err) + } + log.Infow("log opened", "file", p) + + + // make sure we're at the end of the file + at, err := f.Seek(0, io.SeekCurrent) + if err != nil { + return nil, xerrors.Errorf("get current logfile offset: %w", err) + } + end, err := f.Seek(0, io.SeekEnd) + if err != nil { + return nil, xerrors.Errorf("get current logfile offset: %w", err) + } + if at != end { + return nil, xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at) + } + + // todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption + + return &logfile{ + file: f, + }, nil +} + +func (l *logfile) writeEntry(e *Entry) error { + // todo: maybe marshal to some temp buffer, then put into the file? + if err := e.MarshalCBOR(l.file); err != nil { + return xerrors.Errorf("writing log entry: %w", err) + } + + return nil +} + +func (l *logfile) Close() error { + // todo: maybe write a magic 'close at' entry; pad the log to filesystem page to prevent more exotic types of corruption + + if err := l.file.Close(); err != nil { + return err + } + + l.file = nil + + return nil +} diff --git a/lib/backupds/read.go b/lib/backupds/read.go index f9a433637..303866273 100644 --- a/lib/backupds/read.go +++ b/lib/backupds/read.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/sha256" "io" + "os" "github.com/ipfs/go-datastore" cbg "github.com/whyrusleeping/cbor-gen" @@ -13,6 +14,7 @@ import ( func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) error { scratch := make([]byte, 9) + // read array[2]( if _, err := r.Read(scratch[:1]); err != nil { return xerrors.Errorf("reading array header: %w", err) } @@ -24,6 +26,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err hasher := sha256.New() hr := io.TeeReader(r, hasher) + // read array[*]( if _, err := hr.Read(scratch[:1]); err != nil { return xerrors.Errorf("reading array header: %w", err) } @@ -37,10 +40,12 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err return xerrors.Errorf("reading tuple header: %w", err) } + // close array[*] if scratch[0] == 0xff { break } + // read array[2](key:[]byte, value:[]byte) if scratch[0] != 0x82 { return xerrors.Errorf("expected array(2) header 0x82, got %x", scratch[0]) } @@ -63,6 +68,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err sum := hasher.Sum(nil) + // read the [32]byte checksum expSum, err := cbg.ReadByteArray(r, 32) if err != nil { return xerrors.Errorf("reading expected checksum: %w", err) @@ -72,6 +78,43 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err return xerrors.Errorf("checksum didn't match; expected %x, got %x", expSum, sum) } + // read the log, set of Entry-ies + + var ent Entry + bp := cbg.GetPeeker(r) + for { + _, err := bp.ReadByte() + switch err { + case io.EOF, io.ErrUnexpectedEOF: + return nil + case nil: + default: + return xerrors.Errorf("peek log: %w", err) + } + if err := bp.UnreadByte(); err != nil { + return xerrors.Errorf("unread log byte: %w", err) + } + + if err := ent.UnmarshalCBOR(bp); err != nil { + switch err { + case io.EOF, io.ErrUnexpectedEOF: + if os.Getenv("LOTUS_ALLOW_TRUNCATED_LOG") == "1" { + panic("handleme; just ignore and tell the caller about the corrupted file") // todo + } else { + return xerrors.Errorf("log entry potentially truncated, set LOTUS_ALLOW_TRUNCATED_LOG=1 to proceed: %w", err) + } + default: + return xerrors.Errorf("unmarshaling log entry: %w", err) + } + } + + key := datastore.NewKey(string(ent.Key)) + + if err := cb(key, ent.Value); err != nil { + return err + } + } + return nil } diff --git a/node/builder.go b/node/builder.go index 47e685543..3484bd18b 100644 --- a/node/builder.go +++ b/node/builder.go @@ -508,6 +508,7 @@ func ConfigCommon(cfg *config.Common) Option { Override(AddrsFactoryKey, lp2p.AddrsFactory( cfg.Libp2p.AnnounceAddresses, cfg.Libp2p.NoAnnounceAddresses)), + Override(new(dtypes.MetadataDS), modules.Datastore(cfg.Backup.DisableMetadataLog)), ) } @@ -601,7 +602,6 @@ func Repo(r repo.Repo) Option { return Options( Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing - Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore), If(cfg.EnableSplitstore, diff --git a/node/config/def.go b/node/config/def.go index f5f293cbd..11f1a87e9 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -13,6 +13,7 @@ import ( // Common is common config between full node and miner type Common struct { API API + Backup Backup Libp2p Libp2p Pubsub Pubsub } @@ -29,6 +30,10 @@ type FullNode struct { // // Common +type Backup struct { + DisableMetadataLog bool +} + // StorageMiner is a miner config type StorageMiner struct { Common diff --git a/node/modules/storage.go b/node/modules/storage.go index c0e9192f5..9f9b108c2 100644 --- a/node/modules/storage.go +++ b/node/modules/storage.go @@ -2,6 +2,8 @@ package modules import ( "context" + "golang.org/x/xerrors" + "path/filepath" "go.uber.org/fx" @@ -28,12 +30,30 @@ func KeyStore(lr repo.LockedRepo) (types.KeyStore, error) { return lr.KeyStore() } -func Datastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) { - ctx := helpers.LifecycleCtx(mctx, lc) - mds, err := r.Datastore(ctx, "/metadata") - if err != nil { - return nil, err - } +func Datastore(disableLog bool) func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) { + return func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) { + ctx := helpers.LifecycleCtx(mctx, lc) + mds, err := r.Datastore(ctx, "/metadata") + if err != nil { + return nil, err + } - return backupds.Wrap(mds), nil + var logdir string + if !disableLog { + logdir = filepath.Join(r.Path(), "kvlog/metadata") + } + + bds, err := backupds.Wrap(mds, logdir) + if err != nil { + return nil, xerrors.Errorf("opening backupds: %w", err) + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return bds.CloseLog() + }, + }) + + return bds, nil + } }