backupds: make sure logfile matches datastore
This commit is contained in:
parent
9f7d3ed340
commit
b4eefd7220
@ -1,6 +1,7 @@
|
|||||||
package backupds
|
package backupds
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
@ -9,11 +10,14 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var loghead = datastore.NewKey("/backupds/log/head") // string([logfile base name];[uuid];[unix ts])
|
||||||
|
|
||||||
func (d *Datastore) startLog(logdir string) error {
|
func (d *Datastore) startLog(logdir string) error {
|
||||||
if err := os.MkdirAll(logdir, 0755); err != nil && !os.IsExist(err) {
|
if err := os.MkdirAll(logdir, 0755); err != nil && !os.IsExist(err) {
|
||||||
return xerrors.Errorf("mkdir logdir ('%s'): %w", logdir, err)
|
return xerrors.Errorf("mkdir logdir ('%s'): %w", logdir, err)
|
||||||
@ -46,17 +50,21 @@ func (d *Datastore) startLog(logdir string) error {
|
|||||||
|
|
||||||
var l *logfile
|
var l *logfile
|
||||||
if latest == "" {
|
if latest == "" {
|
||||||
l, err = d.createLog(logdir)
|
l, latest, err = d.createLog(logdir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("creating log: %w", err)
|
return xerrors.Errorf("creating log: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
l, err = openLog(filepath.Join(logdir, latest))
|
l, err = d.openLog(filepath.Join(logdir, latest))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("opening log: %w", err)
|
return xerrors.Errorf("opening log: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := l.writeLogHead(latest, d.child); err != nil {
|
||||||
|
return xerrors.Errorf("writing new log head: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(d.closed)
|
defer close(d.closed)
|
||||||
for {
|
for {
|
||||||
@ -86,45 +94,66 @@ type logfile struct {
|
|||||||
file *os.File
|
file *os.File
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Datastore) createLog(logdir string) (*logfile, error) {
|
func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
|
||||||
p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor")
|
p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor")
|
||||||
log.Infow("creating log", "file", p)
|
log.Infow("creating log", "file", p)
|
||||||
|
|
||||||
f, err := os.OpenFile(p, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
|
f, err := os.OpenFile(p, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := d.Backup(f); err != nil {
|
if err := d.Backup(f); err != nil {
|
||||||
return nil, xerrors.Errorf("writing log base: %w", err)
|
return nil, "", xerrors.Errorf("writing log base: %w", err)
|
||||||
}
|
}
|
||||||
if err := f.Sync(); err != nil {
|
if err := f.Sync(); err != nil {
|
||||||
return nil, xerrors.Errorf("sync log base: %w", err)
|
return nil, "", xerrors.Errorf("sync log base: %w", err)
|
||||||
}
|
}
|
||||||
log.Infow("log opened", "file", p)
|
log.Infow("log opened", "file", p)
|
||||||
|
|
||||||
// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption
|
|
||||||
|
|
||||||
return &logfile{
|
return &logfile{
|
||||||
file: f,
|
file: f,
|
||||||
}, nil
|
}, filepath.Base(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func openLog(p string) (*logfile, error) {
|
func (d *Datastore) openLog(p string) (*logfile, error) {
|
||||||
log.Infow("opening log", "file", p)
|
log.Infow("opening log", "file", p)
|
||||||
|
lh, err := d.child.Get(loghead)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("checking log head (logfile '%s'): %w", p, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lhp := strings.Split(string(lh), ";")
|
||||||
|
if len(lhp) != 3 {
|
||||||
|
return nil, xerrors.Errorf("expected loghead to have 3 parts")
|
||||||
|
}
|
||||||
|
|
||||||
|
if lhp[0] != filepath.Base(p) {
|
||||||
|
return nil, xerrors.Errorf("loghead log file doesn't match, opening %s, expected %s", p, lhp[0])
|
||||||
|
}
|
||||||
|
|
||||||
f, err := os.OpenFile(p, os.O_RDWR, 0644)
|
f, err := os.OpenFile(p, os.O_RDWR, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var lastLogHead string
|
||||||
|
var openCount, logvals int64
|
||||||
// check file integrity
|
// check file integrity
|
||||||
err = ReadBackup(f, func(_ datastore.Key, _ []byte) error {
|
err = ReadBackup(f, func(k datastore.Key, v []byte) error {
|
||||||
|
logvals++
|
||||||
|
if k == loghead {
|
||||||
|
lastLogHead = string(v)
|
||||||
|
openCount++
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("reading backup part of the logfile: %w", err)
|
return nil, xerrors.Errorf("reading backup part of the logfile: %w", err)
|
||||||
}
|
}
|
||||||
log.Infow("log opened", "file", p)
|
if string(lh) != lastLogHead {
|
||||||
|
return nil, xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead)
|
||||||
|
}
|
||||||
|
|
||||||
// make sure we're at the end of the file
|
// make sure we're at the end of the file
|
||||||
at, err := f.Seek(0, io.SeekCurrent)
|
at, err := f.Seek(0, io.SeekCurrent)
|
||||||
@ -139,6 +168,8 @@ func openLog(p string) (*logfile, error) {
|
|||||||
return nil, xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at)
|
return nil, xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Infow("log opened", "file", p, "openCount", openCount, "logValues", logvals)
|
||||||
|
|
||||||
// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption
|
// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption
|
||||||
|
|
||||||
return &logfile{
|
return &logfile{
|
||||||
@ -146,6 +177,27 @@ func openLog(p string) (*logfile, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *logfile) writeLogHead(logname string, ds datastore.Batching) error {
|
||||||
|
lval := []byte(fmt.Sprintf("%s;%s;%d", logname, uuid.New(), time.Now().Unix()))
|
||||||
|
|
||||||
|
err := l.writeEntry(&Entry{
|
||||||
|
Key: loghead.Bytes(),
|
||||||
|
Value: lval,
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("writing loghead to the log: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ds.Put(loghead, lval); err != nil {
|
||||||
|
return xerrors.Errorf("writing loghead to the datastore: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("now log head", "loghead", string(lval))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (l *logfile) writeEntry(e *Entry) error {
|
func (l *logfile) writeEntry(e *Entry) error {
|
||||||
// todo: maybe marshal to some temp buffer, then put into the file?
|
// todo: maybe marshal to some temp buffer, then put into the file?
|
||||||
if err := e.MarshalCBOR(l.file); err != nil {
|
if err := e.MarshalCBOR(l.file); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user