Merge pull request #5755 from filecoin-project/feat/kvlog

Metadata datastore log
This commit is contained in:
Łukasz Magiera 2021-03-10 14:13:01 +01:00 committed by GitHub
commit bd8864ae43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 528 additions and 18 deletions

View File

@ -51,7 +51,10 @@ func BackupCmd(repoFlag string, rt repo.RepoType, getApi BackupApiFn) *cli.Comma
return xerrors.Errorf("getting metadata datastore: %w", err) return xerrors.Errorf("getting metadata datastore: %w", err)
} }
bds := backupds.Wrap(mds) bds, err := backupds.Wrap(mds, backupds.NoLogdir)
if err != nil {
return err
}
fpath, err := homedir.Expand(cctx.Args().First()) fpath, err := homedir.Expand(cctx.Args().First())
if err != nil { if err != nil {

View File

@ -4,15 +4,15 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/filecoin-project/lotus/chain/market"
gen "github.com/whyrusleeping/cbor-gen" gen "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/paychmgr" "github.com/filecoin-project/lotus/paychmgr"
) )
@ -105,4 +105,12 @@ func main() {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
} }
err = gen.WriteTupleEncodersToFile("./lib/backupds/cbor_gen.go", "backupds",
backupds.Entry{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
} }

157
lib/backupds/cbor_gen.go Normal file
View File

@ -0,0 +1,157 @@
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
package backupds
import (
"fmt"
"io"
"sort"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
var _ = xerrors.Errorf
var _ = cid.Undef
var _ = sort.Sort
var lengthBufEntry = []byte{131}
func (t *Entry) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write(lengthBufEntry); err != nil {
return err
}
scratch := make([]byte, 9)
// t.Key ([]uint8) (slice)
if len(t.Key) > cbg.ByteArrayMaxLen {
return xerrors.Errorf("Byte array in field t.Key was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajByteString, uint64(len(t.Key))); err != nil {
return err
}
if _, err := w.Write(t.Key[:]); err != nil {
return err
}
// t.Value ([]uint8) (slice)
if len(t.Value) > cbg.ByteArrayMaxLen {
return xerrors.Errorf("Byte array in field t.Value was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajByteString, uint64(len(t.Value))); err != nil {
return err
}
if _, err := w.Write(t.Value[:]); err != nil {
return err
}
// t.Timestamp (int64) (int64)
if t.Timestamp >= 0 {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.Timestamp)); err != nil {
return err
}
} else {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.Timestamp-1)); err != nil {
return err
}
}
return nil
}
func (t *Entry) UnmarshalCBOR(r io.Reader) error {
*t = Entry{}
br := cbg.GetPeeker(r)
scratch := make([]byte, 8)
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.Key ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.Key: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
if extra > 0 {
t.Key = make([]uint8, extra)
}
if _, err := io.ReadFull(br, t.Key[:]); err != nil {
return err
}
// t.Value ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.Value: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
if extra > 0 {
t.Value = make([]uint8, extra)
}
if _, err := io.ReadFull(br, t.Value[:]); err != nil {
return err
}
// t.Timestamp (int64) (int64)
{
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
var extraI int64
if err != nil {
return err
}
switch maj {
case cbg.MajUnsignedInt:
extraI = int64(extra)
if extraI < 0 {
return fmt.Errorf("int64 positive overflow")
}
case cbg.MajNegativeInt:
extraI = int64(extra)
if extraI < 0 {
return fmt.Errorf("int64 negative oveflow")
}
extraI = -1 - extraI
default:
return fmt.Errorf("wrong type for int64 field: %d", maj)
}
t.Timestamp = int64(extraI)
}
return nil
}

View File

@ -4,27 +4,50 @@ import (
"crypto/sha256" "crypto/sha256"
"io" "io"
"sync" "sync"
"time"
logging "github.com/ipfs/go-log/v2" "go.uber.org/multierr"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query" "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
) )
var log = logging.Logger("backupds") var log = logging.Logger("backupds")
const NoLogdir = ""
type Datastore struct { type Datastore struct {
child datastore.Batching child datastore.Batching
backupLk sync.RWMutex backupLk sync.RWMutex
log chan Entry
closing, closed chan struct{}
} }
func Wrap(child datastore.Batching) *Datastore { type Entry struct {
return &Datastore{ Key, Value []byte
Timestamp int64
}
func Wrap(child datastore.Batching, logdir string) (*Datastore, error) {
ds := &Datastore{
child: child, child: child,
} }
if logdir != NoLogdir {
ds.closing, ds.closed = make(chan struct{}), make(chan struct{})
ds.log = make(chan Entry)
if err := ds.startLog(logdir); err != nil {
return nil, err
}
}
return ds, nil
} }
// Writes a datastore dump into the provided writer as // Writes a datastore dump into the provided writer as
@ -129,6 +152,14 @@ func (d *Datastore) Put(key datastore.Key, value []byte) error {
d.backupLk.RLock() d.backupLk.RLock()
defer d.backupLk.RUnlock() defer d.backupLk.RUnlock()
if d.log != nil {
d.log <- Entry{
Key: []byte(key.String()),
Value: value,
Timestamp: time.Now().Unix(),
}
}
return d.child.Put(key, value) return d.child.Put(key, value)
} }
@ -146,11 +177,23 @@ func (d *Datastore) Sync(prefix datastore.Key) error {
return d.child.Sync(prefix) return d.child.Sync(prefix)
} }
func (d *Datastore) Close() error { func (d *Datastore) CloseLog() error {
d.backupLk.RLock() d.backupLk.RLock()
defer d.backupLk.RUnlock() defer d.backupLk.RUnlock()
return d.child.Close() if d.closing != nil {
close(d.closing)
<-d.closed
}
return nil
}
func (d *Datastore) Close() error {
return multierr.Combine(
d.child.Close(),
d.CloseLog(),
)
} }
func (d *Datastore) Batch() (datastore.Batch, error) { func (d *Datastore) Batch() (datastore.Batch, error) {
@ -160,17 +203,27 @@ func (d *Datastore) Batch() (datastore.Batch, error) {
} }
return &bbatch{ return &bbatch{
d: d,
b: b, b: b,
rlk: d.backupLk.RLocker(), rlk: d.backupLk.RLocker(),
}, nil }, nil
} }
type bbatch struct { type bbatch struct {
d *Datastore
b datastore.Batch b datastore.Batch
rlk sync.Locker rlk sync.Locker
} }
func (b *bbatch) Put(key datastore.Key, value []byte) error { func (b *bbatch) Put(key datastore.Key, value []byte) error {
if b.d.log != nil {
b.d.log <- Entry{
Key: []byte(key.String()),
Value: value,
Timestamp: time.Now().Unix(),
}
}
return b.b.Put(key, value) return b.b.Put(key, value)
} }

223
lib/backupds/log.go Normal file
View File

@ -0,0 +1,223 @@
package backupds
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
)
var loghead = datastore.NewKey("/backupds/log/head") // string([logfile base name];[uuid];[unix ts])
func (d *Datastore) startLog(logdir string) error {
if err := os.MkdirAll(logdir, 0755); err != nil && !os.IsExist(err) {
return xerrors.Errorf("mkdir logdir ('%s'): %w", logdir, err)
}
files, err := ioutil.ReadDir(logdir)
if err != nil {
return xerrors.Errorf("read logdir ('%s'): %w", logdir, err)
}
var latest string
var latestTs int64
for _, file := range files {
fn := file.Name()
if !strings.HasSuffix(fn, ".log.cbor") {
log.Warn("logfile with wrong file extension", fn)
continue
}
sec, err := strconv.ParseInt(fn[:len(".log.cbor")], 10, 64)
if err != nil {
return xerrors.Errorf("parsing logfile as a number: %w", err)
}
if sec > latestTs {
latestTs = sec
latest = file.Name()
}
}
var l *logfile
if latest == "" {
l, latest, err = d.createLog(logdir)
if err != nil {
return xerrors.Errorf("creating log: %w", err)
}
} else {
l, err = d.openLog(filepath.Join(logdir, latest))
if err != nil {
return xerrors.Errorf("opening log: %w", err)
}
}
if err := l.writeLogHead(latest, d.child); err != nil {
return xerrors.Errorf("writing new log head: %w", err)
}
go d.runLog(l)
return nil
}
func (d *Datastore) runLog(l *logfile) {
defer close(d.closed)
for {
select {
case ent := <-d.log:
if err := l.writeEntry(&ent); err != nil {
log.Errorw("failed to write log entry", "error", err)
// todo try to do something, maybe start a new log file (but not when we're out of disk space)
}
// todo: batch writes when multiple are pending; flush on a timer
if err := l.file.Sync(); err != nil {
log.Errorw("failed to sync log", "error", err)
}
case <-d.closing:
if err := l.Close(); err != nil {
log.Errorw("failed to close log", "error", err)
}
return
}
}
}
type logfile struct {
file *os.File
}
func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor")
log.Infow("creating log", "file", p)
f, err := os.OpenFile(p, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
if err != nil {
return nil, "", err
}
if err := d.Backup(f); err != nil {
return nil, "", xerrors.Errorf("writing log base: %w", err)
}
if err := f.Sync(); err != nil {
return nil, "", xerrors.Errorf("sync log base: %w", err)
}
log.Infow("log opened", "file", p)
return &logfile{
file: f,
}, filepath.Base(p), nil
}
func (d *Datastore) openLog(p string) (*logfile, error) {
log.Infow("opening log", "file", p)
lh, err := d.child.Get(loghead)
if err != nil {
return nil, xerrors.Errorf("checking log head (logfile '%s'): %w", p, err)
}
lhp := strings.Split(string(lh), ";")
if len(lhp) != 3 {
return nil, xerrors.Errorf("expected loghead to have 3 parts")
}
if lhp[0] != filepath.Base(p) {
return nil, xerrors.Errorf("loghead log file doesn't match, opening %s, expected %s", p, lhp[0])
}
f, err := os.OpenFile(p, os.O_RDWR, 0644)
if err != nil {
return nil, err
}
var lastLogHead string
var openCount, logvals int64
// check file integrity
err = ReadBackup(f, func(k datastore.Key, v []byte) error {
logvals++
if k == loghead {
lastLogHead = string(v)
openCount++
}
return nil
})
if err != nil {
return nil, xerrors.Errorf("reading backup part of the logfile: %w", err)
}
if string(lh) != lastLogHead {
return nil, xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead)
}
// make sure we're at the end of the file
at, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return nil, xerrors.Errorf("get current logfile offset: %w", err)
}
end, err := f.Seek(0, io.SeekEnd)
if err != nil {
return nil, xerrors.Errorf("get current logfile offset: %w", err)
}
if at != end {
return nil, xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at)
}
log.Infow("log opened", "file", p, "openCount", openCount, "logValues", logvals)
// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption
return &logfile{
file: f,
}, nil
}
func (l *logfile) writeLogHead(logname string, ds datastore.Batching) error {
lval := []byte(fmt.Sprintf("%s;%s;%d", logname, uuid.New(), time.Now().Unix()))
err := l.writeEntry(&Entry{
Key: loghead.Bytes(),
Value: lval,
Timestamp: time.Now().Unix(),
})
if err != nil {
return xerrors.Errorf("writing loghead to the log: %w", err)
}
if err := ds.Put(loghead, lval); err != nil {
return xerrors.Errorf("writing loghead to the datastore: %w", err)
}
log.Infow("new log head", "loghead", string(lval))
return nil
}
func (l *logfile) writeEntry(e *Entry) error {
// todo: maybe marshal to some temp buffer, then put into the file?
if err := e.MarshalCBOR(l.file); err != nil {
return xerrors.Errorf("writing log entry: %w", err)
}
return nil
}
func (l *logfile) Close() error {
// todo: maybe write a magic 'close at' entry; pad the log to filesystem page to prevent more exotic types of corruption
if err := l.file.Close(); err != nil {
return err
}
l.file = nil
return nil
}

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"io" "io"
"os"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
@ -13,6 +14,7 @@ import (
func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) error { func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) error {
scratch := make([]byte, 9) scratch := make([]byte, 9)
// read array[2](
if _, err := r.Read(scratch[:1]); err != nil { if _, err := r.Read(scratch[:1]); err != nil {
return xerrors.Errorf("reading array header: %w", err) return xerrors.Errorf("reading array header: %w", err)
} }
@ -24,6 +26,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
hasher := sha256.New() hasher := sha256.New()
hr := io.TeeReader(r, hasher) hr := io.TeeReader(r, hasher)
// read array[*](
if _, err := hr.Read(scratch[:1]); err != nil { if _, err := hr.Read(scratch[:1]); err != nil {
return xerrors.Errorf("reading array header: %w", err) return xerrors.Errorf("reading array header: %w", err)
} }
@ -37,10 +40,12 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
return xerrors.Errorf("reading tuple header: %w", err) return xerrors.Errorf("reading tuple header: %w", err)
} }
// close array[*]
if scratch[0] == 0xff { if scratch[0] == 0xff {
break break
} }
// read array[2](key:[]byte, value:[]byte)
if scratch[0] != 0x82 { if scratch[0] != 0x82 {
return xerrors.Errorf("expected array(2) header 0x82, got %x", scratch[0]) return xerrors.Errorf("expected array(2) header 0x82, got %x", scratch[0])
} }
@ -63,6 +68,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
sum := hasher.Sum(nil) sum := hasher.Sum(nil)
// read the [32]byte checksum
expSum, err := cbg.ReadByteArray(r, 32) expSum, err := cbg.ReadByteArray(r, 32)
if err != nil { if err != nil {
return xerrors.Errorf("reading expected checksum: %w", err) return xerrors.Errorf("reading expected checksum: %w", err)
@ -72,7 +78,42 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
return xerrors.Errorf("checksum didn't match; expected %x, got %x", expSum, sum) return xerrors.Errorf("checksum didn't match; expected %x, got %x", expSum, sum)
} }
return nil // read the log, set of Entry-ies
var ent Entry
bp := cbg.GetPeeker(r)
for {
_, err := bp.ReadByte()
switch err {
case io.EOF, io.ErrUnexpectedEOF:
return nil
case nil:
default:
return xerrors.Errorf("peek log: %w", err)
}
if err := bp.UnreadByte(); err != nil {
return xerrors.Errorf("unread log byte: %w", err)
}
if err := ent.UnmarshalCBOR(bp); err != nil {
switch err {
case io.EOF, io.ErrUnexpectedEOF:
if os.Getenv("LOTUS_ALLOW_TRUNCATED_LOG") == "1" {
panic("handleme; just ignore and tell the caller about the corrupted file") // todo
} else {
return xerrors.Errorf("log entry potentially truncated, set LOTUS_ALLOW_TRUNCATED_LOG=1 to proceed: %w", err)
}
default:
return xerrors.Errorf("unmarshaling log entry: %w", err)
}
}
key := datastore.NewKey(string(ent.Key))
if err := cb(key, ent.Value); err != nil {
return err
}
}
} }
func RestoreInto(r io.Reader, dest datastore.Batching) error { func RestoreInto(r io.Reader, dest datastore.Batching) error {

View File

@ -508,6 +508,7 @@ func ConfigCommon(cfg *config.Common) Option {
Override(AddrsFactoryKey, lp2p.AddrsFactory( Override(AddrsFactoryKey, lp2p.AddrsFactory(
cfg.Libp2p.AnnounceAddresses, cfg.Libp2p.AnnounceAddresses,
cfg.Libp2p.NoAnnounceAddresses)), cfg.Libp2p.NoAnnounceAddresses)),
Override(new(dtypes.MetadataDS), modules.Datastore(cfg.Backup.DisableMetadataLog)),
) )
} }
@ -601,7 +602,6 @@ func Repo(r repo.Repo) Option {
return Options( return Options(
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore), Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
If(cfg.EnableSplitstore, If(cfg.EnableSplitstore,

View File

@ -13,6 +13,7 @@ import (
// Common is common config between full node and miner // Common is common config between full node and miner
type Common struct { type Common struct {
API API API API
Backup Backup
Libp2p Libp2p Libp2p Libp2p
Pubsub Pubsub Pubsub Pubsub
} }
@ -29,6 +30,10 @@ type FullNode struct {
// // Common // // Common
type Backup struct {
DisableMetadataLog bool
}
// StorageMiner is a miner config // StorageMiner is a miner config
type StorageMiner struct { type StorageMiner struct {
Common Common

View File

@ -2,8 +2,10 @@ package modules
import ( import (
"context" "context"
"path/filepath"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/backupds" "github.com/filecoin-project/lotus/lib/backupds"
@ -28,12 +30,30 @@ func KeyStore(lr repo.LockedRepo) (types.KeyStore, error) {
return lr.KeyStore() return lr.KeyStore()
} }
func Datastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) { func Datastore(disableLog bool) func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) {
ctx := helpers.LifecycleCtx(mctx, lc) return func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) {
mds, err := r.Datastore(ctx, "/metadata") ctx := helpers.LifecycleCtx(mctx, lc)
if err != nil { mds, err := r.Datastore(ctx, "/metadata")
return nil, err if err != nil {
} return nil, err
}
return backupds.Wrap(mds), nil var logdir string
if !disableLog {
logdir = filepath.Join(r.Path(), "kvlog/metadata")
}
bds, err := backupds.Wrap(mds, logdir)
if err != nil {
return nil, xerrors.Errorf("opening backupds: %w", err)
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return bds.CloseLog()
},
})
return bds, nil
}
} }