Merge pull request #5875 from filecoin-project/feat/log-compact-restart
backupds: Compact log on restart
This commit is contained in:
commit
8f78066d4f
@ -180,8 +180,11 @@ var datastoreBackupStatCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
defer f.Close() // nolint:errcheck
|
defer f.Close() // nolint:errcheck
|
||||||
|
|
||||||
var keys, kbytes, vbytes uint64
|
var keys, logs, kbytes, vbytes uint64
|
||||||
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte) error {
|
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte, log bool) error {
|
||||||
|
if log {
|
||||||
|
logs++
|
||||||
|
}
|
||||||
keys++
|
keys++
|
||||||
kbytes += uint64(len(key.String()))
|
kbytes += uint64(len(key.String()))
|
||||||
vbytes += uint64(len(value))
|
vbytes += uint64(len(value))
|
||||||
@ -192,6 +195,7 @@ var datastoreBackupStatCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Keys: ", keys)
|
fmt.Println("Keys: ", keys)
|
||||||
|
fmt.Println("Log values: ", log)
|
||||||
fmt.Println("Key bytes: ", units.BytesSize(float64(kbytes)))
|
fmt.Println("Key bytes: ", units.BytesSize(float64(kbytes)))
|
||||||
fmt.Println("Value bytes: ", units.BytesSize(float64(vbytes)))
|
fmt.Println("Value bytes: ", units.BytesSize(float64(vbytes)))
|
||||||
|
|
||||||
@ -225,7 +229,7 @@ var datastoreBackupListCmd = &cli.Command{
|
|||||||
defer f.Close() // nolint:errcheck
|
defer f.Close() // nolint:errcheck
|
||||||
|
|
||||||
printKv := kvPrinter(cctx.Bool("top-level"), cctx.String("get-enc"))
|
printKv := kvPrinter(cctx.Bool("top-level"), cctx.String("get-enc"))
|
||||||
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte) error {
|
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte, _ bool) error {
|
||||||
return printKv(key.String(), value)
|
return printKv(key.String(), value)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -55,7 +55,7 @@ func (d *Datastore) startLog(logdir string) error {
|
|||||||
return xerrors.Errorf("creating log: %w", err)
|
return xerrors.Errorf("creating log: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
l, err = d.openLog(filepath.Join(logdir, latest))
|
l, latest, err = d.openLog(filepath.Join(logdir, latest))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("opening log: %w", err)
|
return xerrors.Errorf("opening log: %w", err)
|
||||||
}
|
}
|
||||||
@ -97,6 +97,8 @@ type logfile struct {
|
|||||||
file *os.File
|
file *os.File
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var compactThresh = 2
|
||||||
|
|
||||||
func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
|
func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
|
||||||
p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor")
|
p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor")
|
||||||
log.Infow("creating log", "file", p)
|
log.Infow("creating log", "file", p)
|
||||||
@ -119,32 +121,36 @@ func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
|
|||||||
}, filepath.Base(p), nil
|
}, filepath.Base(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Datastore) openLog(p string) (*logfile, error) {
|
func (d *Datastore) openLog(p string) (*logfile, string, error) {
|
||||||
log.Infow("opening log", "file", p)
|
log.Infow("opening log", "file", p)
|
||||||
lh, err := d.child.Get(loghead)
|
lh, err := d.child.Get(loghead)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("checking log head (logfile '%s'): %w", p, err)
|
return nil, "", xerrors.Errorf("checking log head (logfile '%s'): %w", p, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lhp := strings.Split(string(lh), ";")
|
lhp := strings.Split(string(lh), ";")
|
||||||
if len(lhp) != 3 {
|
if len(lhp) != 3 {
|
||||||
return nil, xerrors.Errorf("expected loghead to have 3 parts")
|
return nil, "", xerrors.Errorf("expected loghead to have 3 parts")
|
||||||
}
|
}
|
||||||
|
|
||||||
if lhp[0] != filepath.Base(p) {
|
if lhp[0] != filepath.Base(p) {
|
||||||
return nil, xerrors.Errorf("loghead log file doesn't match, opening %s, expected %s", p, lhp[0])
|
return nil, "", xerrors.Errorf("loghead log file doesn't match, opening %s, expected %s", p, lhp[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := os.OpenFile(p, os.O_RDWR, 0644)
|
f, err := os.OpenFile(p, os.O_RDWR, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
var lastLogHead string
|
var lastLogHead string
|
||||||
var openCount, logvals int64
|
var openCount, vals, logvals int64
|
||||||
// check file integrity
|
// check file integrity
|
||||||
err = ReadBackup(f, func(k datastore.Key, v []byte) error {
|
err = ReadBackup(f, func(k datastore.Key, v []byte, log bool) error {
|
||||||
logvals++
|
if log {
|
||||||
|
logvals++
|
||||||
|
} else {
|
||||||
|
vals++
|
||||||
|
}
|
||||||
if k == loghead {
|
if k == loghead {
|
||||||
lastLogHead = string(v)
|
lastLogHead = string(v)
|
||||||
openCount++
|
openCount++
|
||||||
@ -152,32 +158,53 @@ func (d *Datastore) openLog(p string) (*logfile, error) {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("reading backup part of the logfile: %w", err)
|
return nil, "", xerrors.Errorf("reading backup part of the logfile: %w", err)
|
||||||
}
|
}
|
||||||
if string(lh) != lastLogHead {
|
if string(lh) != lastLogHead {
|
||||||
return nil, xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead)
|
return nil, "", xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead)
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure we're at the end of the file
|
// make sure we're at the end of the file
|
||||||
at, err := f.Seek(0, io.SeekCurrent)
|
at, err := f.Seek(0, io.SeekCurrent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("get current logfile offset: %w", err)
|
return nil, "", xerrors.Errorf("get current logfile offset: %w", err)
|
||||||
}
|
}
|
||||||
end, err := f.Seek(0, io.SeekEnd)
|
end, err := f.Seek(0, io.SeekEnd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("get current logfile offset: %w", err)
|
return nil, "", xerrors.Errorf("get current logfile offset: %w", err)
|
||||||
}
|
}
|
||||||
if at != end {
|
if at != end {
|
||||||
return nil, xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at)
|
return nil, "", xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("log opened", "file", p, "openCount", openCount, "logValues", logvals)
|
compact := logvals > vals*int64(compactThresh)
|
||||||
|
if compact {
|
||||||
|
log.Infow("compacting log", "current", p, "openCount", openCount, "baseValues", vals, "logValues", logvals)
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return nil, "", xerrors.Errorf("closing current log: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
l, latest, err := d.createLog(filepath.Dir(p))
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", xerrors.Errorf("creating compacted log: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("compacted log created, cleaning up old", "old", p, "new", latest)
|
||||||
|
if err := os.Remove(p); err != nil {
|
||||||
|
l.Close() // nolint
|
||||||
|
return nil, "", xerrors.Errorf("cleaning up old logfile: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return l, latest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("log opened", "file", p, "openCount", openCount, "baseValues", vals, "logValues", logvals)
|
||||||
|
|
||||||
// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption
|
// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption
|
||||||
|
|
||||||
return &logfile{
|
return &logfile{
|
||||||
file: f,
|
file: f,
|
||||||
}, nil
|
}, filepath.Base(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *logfile) writeLogHead(logname string, ds datastore.Batching) error {
|
func (l *logfile) writeLogHead(logname string, ds datastore.Batching) error {
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) error {
|
func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool) error) error {
|
||||||
scratch := make([]byte, 9)
|
scratch := make([]byte, 9)
|
||||||
|
|
||||||
// read array[2](
|
// read array[2](
|
||||||
@ -61,7 +61,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
|
|||||||
return xerrors.Errorf("reading value: %w", err)
|
return xerrors.Errorf("reading value: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cb(key, value); err != nil {
|
if err := cb(key, value, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -110,7 +110,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
|
|||||||
|
|
||||||
key := datastore.NewKey(string(ent.Key))
|
key := datastore.NewKey(string(ent.Key))
|
||||||
|
|
||||||
if err := cb(key, ent.Value); err != nil {
|
if err := cb(key, ent.Value, true); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -122,7 +122,7 @@ func RestoreInto(r io.Reader, dest datastore.Batching) error {
|
|||||||
return xerrors.Errorf("creating batch: %w", err)
|
return xerrors.Errorf("creating batch: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ReadBackup(r, func(key datastore.Key, value []byte) error {
|
err = ReadBackup(r, func(key datastore.Key, value []byte, _ bool) error {
|
||||||
if err := batch.Put(key, value); err != nil {
|
if err := batch.Put(key, value); err != nil {
|
||||||
return xerrors.Errorf("put key: %w", err)
|
return xerrors.Errorf("put key: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user