lotus/lib/backupds/log.go

251 lines
6.2 KiB
Go
Raw Normal View History

2021-03-09 21:33:01 +00:00
package backupds
import (
"fmt"
2021-03-09 21:33:01 +00:00
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/google/uuid"
2021-03-09 21:33:01 +00:00
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
)
var loghead = datastore.NewKey("/backupds/log/head") // string([logfile base name];[uuid];[unix ts])
2021-03-09 21:33:01 +00:00
func (d *Datastore) startLog(logdir string) error {
if err := os.MkdirAll(logdir, 0755); err != nil && !os.IsExist(err) {
return xerrors.Errorf("mkdir logdir ('%s'): %w", logdir, err)
}
files, err := ioutil.ReadDir(logdir)
if err != nil {
return xerrors.Errorf("read logdir ('%s'): %w", logdir, err)
}
var latest string
var latestTs int64
for _, file := range files {
fn := file.Name()
if !strings.HasSuffix(fn, ".log.cbor") {
log.Warn("logfile with wrong file extension", fn)
continue
}
sec, err := strconv.ParseInt(fn[:len(".log.cbor")], 10, 64)
if err != nil {
return xerrors.Errorf("parsing logfile as a number: %w", err)
}
if sec > latestTs {
latestTs = sec
latest = file.Name()
}
}
var l *logfile
if latest == "" {
l, latest, err = d.createLog(logdir)
2021-03-09 21:33:01 +00:00
if err != nil {
return xerrors.Errorf("creating log: %w", err)
}
} else {
2021-03-24 20:16:42 +00:00
l, latest, err = d.openLog(filepath.Join(logdir, latest))
2021-03-09 21:33:01 +00:00
if err != nil {
return xerrors.Errorf("opening log: %w", err)
}
}
if err := l.writeLogHead(latest, d.child); err != nil {
return xerrors.Errorf("writing new log head: %w", err)
}
2021-03-10 12:58:02 +00:00
go d.runLog(l)
2021-03-09 21:33:01 +00:00
return nil
}
2021-03-10 12:58:02 +00:00
func (d *Datastore) runLog(l *logfile) {
defer close(d.closed)
for {
select {
case ent := <-d.log:
if err := l.writeEntry(&ent); err != nil {
log.Errorw("failed to write log entry", "error", err)
// todo try to do something, maybe start a new log file (but not when we're out of disk space)
}
// todo: batch writes when multiple are pending; flush on a timer
if err := l.file.Sync(); err != nil {
log.Errorw("failed to sync log", "error", err)
}
case <-d.closing:
if err := l.Close(); err != nil {
log.Errorw("failed to close log", "error", err)
}
return
}
}
}
2021-03-09 21:33:01 +00:00
type logfile struct {
file *os.File
}
2021-03-24 20:16:42 +00:00
var compactThresh = 2
func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
2021-03-09 21:33:01 +00:00
p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor")
log.Infow("creating log", "file", p)
2021-03-09 21:33:12 +00:00
f, err := os.OpenFile(p, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
2021-03-09 21:33:01 +00:00
if err != nil {
return nil, "", err
2021-03-09 21:33:01 +00:00
}
if err := d.Backup(f); err != nil {
return nil, "", xerrors.Errorf("writing log base: %w", err)
2021-03-09 21:33:01 +00:00
}
if err := f.Sync(); err != nil {
return nil, "", xerrors.Errorf("sync log base: %w", err)
2021-03-09 21:33:01 +00:00
}
log.Infow("log opened", "file", p)
return &logfile{
file: f,
}, filepath.Base(p), nil
2021-03-09 21:33:01 +00:00
}
2021-03-24 20:16:42 +00:00
func (d *Datastore) openLog(p string) (*logfile, string, error) {
2021-03-09 21:33:01 +00:00
log.Infow("opening log", "file", p)
lh, err := d.child.Get(loghead)
if err != nil {
2021-03-24 20:16:42 +00:00
return nil, "", xerrors.Errorf("checking log head (logfile '%s'): %w", p, err)
}
lhp := strings.Split(string(lh), ";")
if len(lhp) != 3 {
2021-03-24 20:16:42 +00:00
return nil, "", xerrors.Errorf("expected loghead to have 3 parts")
}
if lhp[0] != filepath.Base(p) {
2021-03-24 20:16:42 +00:00
return nil, "", xerrors.Errorf("loghead log file doesn't match, opening %s, expected %s", p, lhp[0])
}
2021-03-09 21:33:01 +00:00
f, err := os.OpenFile(p, os.O_RDWR, 0644)
if err != nil {
2021-03-24 20:16:42 +00:00
return nil, "", err
2021-03-09 21:33:01 +00:00
}
var lastLogHead string
2021-03-24 20:16:42 +00:00
var openCount, vals, logvals int64
2021-03-09 21:33:01 +00:00
// check file integrity
2021-03-24 20:16:42 +00:00
err = ReadBackup(f, func(k datastore.Key, v []byte, log bool) error {
if log {
logvals++
} else {
vals++
}
if k == loghead {
lastLogHead = string(v)
openCount++
}
2021-03-09 21:33:01 +00:00
return nil
})
if err != nil {
2021-03-24 20:16:42 +00:00
return nil, "", xerrors.Errorf("reading backup part of the logfile: %w", err)
2021-03-09 21:33:01 +00:00
}
if string(lh) != lastLogHead {
2021-03-24 20:16:42 +00:00
return nil, "", xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead)
}
2021-03-09 21:33:01 +00:00
// make sure we're at the end of the file
at, err := f.Seek(0, io.SeekCurrent)
if err != nil {
2021-03-24 20:16:42 +00:00
return nil, "", xerrors.Errorf("get current logfile offset: %w", err)
2021-03-09 21:33:01 +00:00
}
end, err := f.Seek(0, io.SeekEnd)
if err != nil {
2021-03-24 20:16:42 +00:00
return nil, "", xerrors.Errorf("get current logfile offset: %w", err)
2021-03-09 21:33:01 +00:00
}
if at != end {
2021-03-24 20:16:42 +00:00
return nil, "", xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at)
2021-03-09 21:33:01 +00:00
}
2021-03-24 20:16:42 +00:00
compact := logvals > vals * int64(compactThresh)
if compact {
log.Infow("compacting log", "current", p, "openCount", openCount, "baseValues", vals, "logValues", logvals)
if err := f.Close(); err != nil {
return nil, "", xerrors.Errorf("closing current log: %w", err)
}
l, latest, err := d.createLog(filepath.Dir(p))
if err != nil {
return nil, "", xerrors.Errorf("creating compacted log: %w", err)
}
log.Infow("compacted log created, cleaning up old", "old", p, "new", latest)
if err := os.Remove(p); err != nil {
l.Close() // nolint
return nil, "", xerrors.Errorf("cleaning up old logfile: %w", err)
}
return l, latest, nil
}
log.Infow("log opened", "file", p, "openCount", openCount, "baseValues", vals, "logValues", logvals)
2021-03-09 21:33:01 +00:00
// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption
return &logfile{
file: f,
2021-03-24 20:16:42 +00:00
}, filepath.Base(p), nil
2021-03-09 21:33:01 +00:00
}
func (l *logfile) writeLogHead(logname string, ds datastore.Batching) error {
lval := []byte(fmt.Sprintf("%s;%s;%d", logname, uuid.New(), time.Now().Unix()))
err := l.writeEntry(&Entry{
Key: loghead.Bytes(),
Value: lval,
Timestamp: time.Now().Unix(),
})
if err != nil {
return xerrors.Errorf("writing loghead to the log: %w", err)
}
if err := ds.Put(loghead, lval); err != nil {
return xerrors.Errorf("writing loghead to the datastore: %w", err)
}
log.Infow("new log head", "loghead", string(lval))
return nil
}
2021-03-09 21:33:01 +00:00
func (l *logfile) writeEntry(e *Entry) error {
// todo: maybe marshal to some temp buffer, then put into the file?
if err := e.MarshalCBOR(l.file); err != nil {
return xerrors.Errorf("writing log entry: %w", err)
}
return nil
}
func (l *logfile) Close() error {
// todo: maybe write a magic 'close at' entry; pad the log to filesystem page to prevent more exotic types of corruption
if err := l.file.Close(); err != nil {
return err
}
l.file = nil
return nil
}