Merge pull request #5891 from filecoin-project/feat/backupds-trunc-log-handling
backupds: Improve truncated log handling
This commit is contained in:
commit
0cfc76c078
@ -181,7 +181,7 @@ var datastoreBackupStatCmd = &cli.Command{
|
|||||||
defer f.Close() // nolint:errcheck
|
defer f.Close() // nolint:errcheck
|
||||||
|
|
||||||
var keys, logs, kbytes, vbytes uint64
|
var keys, logs, kbytes, vbytes uint64
|
||||||
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte, log bool) error {
|
clean, err := backupds.ReadBackup(f, func(key datastore.Key, value []byte, log bool) error {
|
||||||
if log {
|
if log {
|
||||||
logs++
|
logs++
|
||||||
}
|
}
|
||||||
@ -194,6 +194,7 @@ var datastoreBackupStatCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fmt.Println("Truncated: ", !clean)
|
||||||
fmt.Println("Keys: ", keys)
|
fmt.Println("Keys: ", keys)
|
||||||
fmt.Println("Log values: ", log)
|
fmt.Println("Log values: ", log)
|
||||||
fmt.Println("Key bytes: ", units.BytesSize(float64(kbytes)))
|
fmt.Println("Key bytes: ", units.BytesSize(float64(kbytes)))
|
||||||
@ -229,7 +230,7 @@ var datastoreBackupListCmd = &cli.Command{
|
|||||||
defer f.Close() // nolint:errcheck
|
defer f.Close() // nolint:errcheck
|
||||||
|
|
||||||
printKv := kvPrinter(cctx.Bool("top-level"), cctx.String("get-enc"))
|
printKv := kvPrinter(cctx.Bool("top-level"), cctx.String("get-enc"))
|
||||||
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte, _ bool) error {
|
_, err = backupds.ReadBackup(f, func(key datastore.Key, value []byte, _ bool) error {
|
||||||
return printKv(key.String(), value)
|
return printKv(key.String(), value)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -145,7 +145,7 @@ func (d *Datastore) openLog(p string) (*logfile, string, error) {
|
|||||||
var lastLogHead string
|
var lastLogHead string
|
||||||
var openCount, vals, logvals int64
|
var openCount, vals, logvals int64
|
||||||
// check file integrity
|
// check file integrity
|
||||||
err = ReadBackup(f, func(k datastore.Key, v []byte, log bool) error {
|
clean, err := ReadBackup(f, func(k datastore.Key, v []byte, log bool) error {
|
||||||
if log {
|
if log {
|
||||||
logvals++
|
logvals++
|
||||||
} else {
|
} else {
|
||||||
@ -160,7 +160,7 @@ func (d *Datastore) openLog(p string) (*logfile, string, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", xerrors.Errorf("reading backup part of the logfile: %w", err)
|
return nil, "", xerrors.Errorf("reading backup part of the logfile: %w", err)
|
||||||
}
|
}
|
||||||
if string(lh) != lastLogHead {
|
if string(lh) != lastLogHead && clean { // if not clean, user has opted in to ignore truncated logs, this will almost certainly happen
|
||||||
return nil, "", xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead)
|
return nil, "", xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,8 +178,8 @@ func (d *Datastore) openLog(p string) (*logfile, string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
compact := logvals > vals*int64(compactThresh)
|
compact := logvals > vals*int64(compactThresh)
|
||||||
if compact {
|
if compact || !clean {
|
||||||
log.Infow("compacting log", "current", p, "openCount", openCount, "baseValues", vals, "logValues", logvals)
|
log.Infow("compacting log", "current", p, "openCount", openCount, "baseValues", vals, "logValues", logvals, "truncated", !clean)
|
||||||
if err := f.Close(); err != nil {
|
if err := f.Close(); err != nil {
|
||||||
return nil, "", xerrors.Errorf("closing current log: %w", err)
|
return nil, "", xerrors.Errorf("closing current log: %w", err)
|
||||||
}
|
}
|
||||||
@ -189,10 +189,14 @@ func (d *Datastore) openLog(p string) (*logfile, string, error) {
|
|||||||
return nil, "", xerrors.Errorf("creating compacted log: %w", err)
|
return nil, "", xerrors.Errorf("creating compacted log: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("compacted log created, cleaning up old", "old", p, "new", latest)
|
if clean {
|
||||||
if err := os.Remove(p); err != nil {
|
log.Infow("compacted log created, cleaning up old", "old", p, "new", latest)
|
||||||
l.Close() // nolint
|
if err := os.Remove(p); err != nil {
|
||||||
return nil, "", xerrors.Errorf("cleaning up old logfile: %w", err)
|
l.Close() // nolint
|
||||||
|
return nil, "", xerrors.Errorf("cleaning up old logfile: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Errorw("LOG FILE WAS TRUNCATED, KEEPING THE FILE", "old", p, "new", latest)
|
||||||
}
|
}
|
||||||
|
|
||||||
return l, latest, nil
|
return l, latest, nil
|
||||||
|
@ -11,16 +11,16 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool) error) error {
|
func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool) error) (bool, error) {
|
||||||
scratch := make([]byte, 9)
|
scratch := make([]byte, 9)
|
||||||
|
|
||||||
// read array[2](
|
// read array[2](
|
||||||
if _, err := r.Read(scratch[:1]); err != nil {
|
if _, err := r.Read(scratch[:1]); err != nil {
|
||||||
return xerrors.Errorf("reading array header: %w", err)
|
return false, xerrors.Errorf("reading array header: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if scratch[0] != 0x82 {
|
if scratch[0] != 0x82 {
|
||||||
return xerrors.Errorf("expected array(2) header byte 0x82, got %x", scratch[0])
|
return false, xerrors.Errorf("expected array(2) header byte 0x82, got %x", scratch[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
hasher := sha256.New()
|
hasher := sha256.New()
|
||||||
@ -28,16 +28,16 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool)
|
|||||||
|
|
||||||
// read array[*](
|
// read array[*](
|
||||||
if _, err := hr.Read(scratch[:1]); err != nil {
|
if _, err := hr.Read(scratch[:1]); err != nil {
|
||||||
return xerrors.Errorf("reading array header: %w", err)
|
return false, xerrors.Errorf("reading array header: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if scratch[0] != 0x9f {
|
if scratch[0] != 0x9f {
|
||||||
return xerrors.Errorf("expected indefinite length array header byte 0x9f, got %x", scratch[0])
|
return false, xerrors.Errorf("expected indefinite length array header byte 0x9f, got %x", scratch[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if _, err := hr.Read(scratch[:1]); err != nil {
|
if _, err := hr.Read(scratch[:1]); err != nil {
|
||||||
return xerrors.Errorf("reading tuple header: %w", err)
|
return false, xerrors.Errorf("reading tuple header: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// close array[*]
|
// close array[*]
|
||||||
@ -47,22 +47,22 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool)
|
|||||||
|
|
||||||
// read array[2](key:[]byte, value:[]byte)
|
// read array[2](key:[]byte, value:[]byte)
|
||||||
if scratch[0] != 0x82 {
|
if scratch[0] != 0x82 {
|
||||||
return xerrors.Errorf("expected array(2) header 0x82, got %x", scratch[0])
|
return false, xerrors.Errorf("expected array(2) header 0x82, got %x", scratch[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
keyb, err := cbg.ReadByteArray(hr, 1<<40)
|
keyb, err := cbg.ReadByteArray(hr, 1<<40)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("reading key: %w", err)
|
return false, xerrors.Errorf("reading key: %w", err)
|
||||||
}
|
}
|
||||||
key := datastore.NewKey(string(keyb))
|
key := datastore.NewKey(string(keyb))
|
||||||
|
|
||||||
value, err := cbg.ReadByteArray(hr, 1<<40)
|
value, err := cbg.ReadByteArray(hr, 1<<40)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("reading value: %w", err)
|
return false, xerrors.Errorf("reading value: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cb(key, value, false); err != nil {
|
if err := cb(key, value, false); err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,11 +71,11 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool)
|
|||||||
// read the [32]byte checksum
|
// read the [32]byte checksum
|
||||||
expSum, err := cbg.ReadByteArray(r, 32)
|
expSum, err := cbg.ReadByteArray(r, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("reading expected checksum: %w", err)
|
return false, xerrors.Errorf("reading expected checksum: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.Equal(sum, expSum) {
|
if !bytes.Equal(sum, expSum) {
|
||||||
return xerrors.Errorf("checksum didn't match; expected %x, got %x", expSum, sum)
|
return false, xerrors.Errorf("checksum didn't match; expected %x, got %x", expSum, sum)
|
||||||
}
|
}
|
||||||
|
|
||||||
// read the log, set of Entry-ies
|
// read the log, set of Entry-ies
|
||||||
@ -86,32 +86,32 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool)
|
|||||||
_, err := bp.ReadByte()
|
_, err := bp.ReadByte()
|
||||||
switch err {
|
switch err {
|
||||||
case io.EOF, io.ErrUnexpectedEOF:
|
case io.EOF, io.ErrUnexpectedEOF:
|
||||||
return nil
|
return true, nil
|
||||||
case nil:
|
case nil:
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("peek log: %w", err)
|
return false, xerrors.Errorf("peek log: %w", err)
|
||||||
}
|
}
|
||||||
if err := bp.UnreadByte(); err != nil {
|
if err := bp.UnreadByte(); err != nil {
|
||||||
return xerrors.Errorf("unread log byte: %w", err)
|
return false, xerrors.Errorf("unread log byte: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ent.UnmarshalCBOR(bp); err != nil {
|
if err := ent.UnmarshalCBOR(bp); err != nil {
|
||||||
switch err {
|
switch err {
|
||||||
case io.EOF, io.ErrUnexpectedEOF:
|
case io.EOF, io.ErrUnexpectedEOF:
|
||||||
if os.Getenv("LOTUS_ALLOW_TRUNCATED_LOG") == "1" {
|
if os.Getenv("LOTUS_ALLOW_TRUNCATED_LOG") == "1" {
|
||||||
panic("handleme; just ignore and tell the caller about the corrupted file") // todo
|
log.Errorw("log entry potentially truncated")
|
||||||
} else {
|
return false, nil
|
||||||
return xerrors.Errorf("log entry potentially truncated, set LOTUS_ALLOW_TRUNCATED_LOG=1 to proceed: %w", err)
|
|
||||||
}
|
}
|
||||||
|
return false, xerrors.Errorf("log entry potentially truncated, set LOTUS_ALLOW_TRUNCATED_LOG=1 to proceed: %w", err)
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("unmarshaling log entry: %w", err)
|
return false, xerrors.Errorf("unmarshaling log entry: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
key := datastore.NewKey(string(ent.Key))
|
key := datastore.NewKey(string(ent.Key))
|
||||||
|
|
||||||
if err := cb(key, ent.Value, true); err != nil {
|
if err := cb(key, ent.Value, true); err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -122,7 +122,7 @@ func RestoreInto(r io.Reader, dest datastore.Batching) error {
|
|||||||
return xerrors.Errorf("creating batch: %w", err)
|
return xerrors.Errorf("creating batch: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ReadBackup(r, func(key datastore.Key, value []byte, _ bool) error {
|
_, err = ReadBackup(r, func(key datastore.Key, value []byte, _ bool) error {
|
||||||
if err := batch.Put(key, value); err != nil {
|
if err := batch.Put(key, value); err != nil {
|
||||||
return xerrors.Errorf("put key: %w", err)
|
return xerrors.Errorf("put key: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user