From bb79eba74c9558c419bec5f3629f6921edf9bcfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Mar 2021 21:16:42 +0100 Subject: [PATCH] backupds: Compact log on restart --- lib/backupds/log.go | 59 ++++++++++++++++++++++++++++++++------------ lib/backupds/read.go | 8 +++--- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/lib/backupds/log.go b/lib/backupds/log.go index 85db600ef..9473299c0 100644 --- a/lib/backupds/log.go +++ b/lib/backupds/log.go @@ -55,7 +55,7 @@ func (d *Datastore) startLog(logdir string) error { return xerrors.Errorf("creating log: %w", err) } } else { - l, err = d.openLog(filepath.Join(logdir, latest)) + l, latest, err = d.openLog(filepath.Join(logdir, latest)) if err != nil { return xerrors.Errorf("opening log: %w", err) } @@ -97,6 +97,8 @@ type logfile struct { file *os.File } +var compactThresh = 2 + func (d *Datastore) createLog(logdir string) (*logfile, string, error) { p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor") log.Infow("creating log", "file", p) @@ -119,32 +121,36 @@ func (d *Datastore) createLog(logdir string) (*logfile, string, error) { }, filepath.Base(p), nil } -func (d *Datastore) openLog(p string) (*logfile, error) { +func (d *Datastore) openLog(p string) (*logfile, string, error) { log.Infow("opening log", "file", p) lh, err := d.child.Get(loghead) if err != nil { - return nil, xerrors.Errorf("checking log head (logfile '%s'): %w", p, err) + return nil, "", xerrors.Errorf("checking log head (logfile '%s'): %w", p, err) } lhp := strings.Split(string(lh), ";") if len(lhp) != 3 { - return nil, xerrors.Errorf("expected loghead to have 3 parts") + return nil, "", xerrors.Errorf("expected loghead to have 3 parts") } if lhp[0] != filepath.Base(p) { - return nil, xerrors.Errorf("loghead log file doesn't match, opening %s, expected %s", p, lhp[0]) + return nil, "", xerrors.Errorf("loghead log file doesn't match, opening %s, expected %s", p, lhp[0]) } f, err := os.OpenFile(p, os.O_RDWR, 0644) if err != nil { - return nil, err + return nil, "", err } var lastLogHead string - var openCount, logvals int64 + var openCount, vals, logvals int64 // check file integrity - err = ReadBackup(f, func(k datastore.Key, v []byte) error { - logvals++ + err = ReadBackup(f, func(k datastore.Key, v []byte, log bool) error { + if log { + logvals++ + } else { + vals++ + } if k == loghead { lastLogHead = string(v) openCount++ @@ -152,32 +158,53 @@ func (d *Datastore) openLog(p string) (*logfile, error) { return nil }) if err != nil { - return nil, xerrors.Errorf("reading backup part of the logfile: %w", err) + return nil, "", xerrors.Errorf("reading backup part of the logfile: %w", err) } if string(lh) != lastLogHead { - return nil, xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead) + return nil, "", xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead) } // make sure we're at the end of the file at, err := f.Seek(0, io.SeekCurrent) if err != nil { - return nil, xerrors.Errorf("get current logfile offset: %w", err) + return nil, "", xerrors.Errorf("get current logfile offset: %w", err) } end, err := f.Seek(0, io.SeekEnd) if err != nil { - return nil, xerrors.Errorf("get current logfile offset: %w", err) + return nil, "", xerrors.Errorf("get current logfile offset: %w", err) } if at != end { - return nil, xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at) + return nil, "", xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at) } - log.Infow("log opened", "file", p, "openCount", openCount, "logValues", logvals) + compact := logvals > vals * int64(compactThresh) + if compact { + log.Infow("compacting log", "current", p, "openCount", openCount, "baseValues", vals, "logValues", logvals) + if err := f.Close(); err != nil { + return nil, "", xerrors.Errorf("closing current log: %w", err) + } + + l, latest, err := d.createLog(filepath.Dir(p)) + if err != nil { + return nil, "", xerrors.Errorf("creating compacted log: %w", err) + } + + log.Infow("compacted log created, cleaning up old", "old", p, "new", latest) + if err := os.Remove(p); err != nil { + l.Close() // nolint + return nil, "", xerrors.Errorf("cleaning up old logfile: %w", err) + } + + return l, latest, nil + } + + log.Infow("log opened", "file", p, "openCount", openCount, "baseValues", vals, "logValues", logvals) // todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption return &logfile{ file: f, - }, nil + }, filepath.Base(p), nil } func (l *logfile) writeLogHead(logname string, ds datastore.Batching) error { diff --git a/lib/backupds/read.go b/lib/backupds/read.go index 7c8e33e74..3d6d24139 100644 --- a/lib/backupds/read.go +++ b/lib/backupds/read.go @@ -11,7 +11,7 @@ import ( "golang.org/x/xerrors" ) -func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) error { +func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool) error) error { scratch := make([]byte, 9) // read array[2]( @@ -61,7 +61,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err return xerrors.Errorf("reading value: %w", err) } - if err := cb(key, value); err != nil { + if err := cb(key, value, false); err != nil { return err } } @@ -110,7 +110,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err key := datastore.NewKey(string(ent.Key)) - if err := cb(key, ent.Value); err != nil { + if err := cb(key, ent.Value, true); err != nil { return err } } @@ -122,7 +122,7 @@ func RestoreInto(r io.Reader, dest datastore.Batching) error { return xerrors.Errorf("creating batch: %w", err) } - err = ReadBackup(r, func(key datastore.Key, value []byte) error { + err = ReadBackup(r, func(key datastore.Key, value []byte, _ bool) error { if err := batch.Put(key, value); err != nil { return xerrors.Errorf("put key: %w", err) }