backupds kvlog: Address review
This commit is contained in:
parent
2b380c96a5
commit
3f1054daf4
@ -6,6 +6,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/multierr"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
@ -189,11 +190,10 @@ func (d *Datastore) CloseLog() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Datastore) Close() error {
|
func (d *Datastore) Close() error {
|
||||||
if err := d.child.Close(); err != nil {
|
return multierr.Combine(
|
||||||
return xerrors.Errorf("closing child datastore: %w", err)
|
d.child.Close(),
|
||||||
}
|
d.CloseLog(),
|
||||||
|
)
|
||||||
return d.CloseLog()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Datastore) Batch() (datastore.Batch, error) {
|
func (d *Datastore) Batch() (datastore.Batch, error) {
|
||||||
|
@ -65,32 +65,34 @@ func (d *Datastore) startLog(logdir string) error {
|
|||||||
return xerrors.Errorf("writing new log head: %w", err)
|
return xerrors.Errorf("writing new log head: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go d.runLog(l)
|
||||||
defer close(d.closed)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case ent := <-d.log:
|
|
||||||
if err := l.writeEntry(&ent); err != nil {
|
|
||||||
log.Errorw("failed to write log entry", "error", err)
|
|
||||||
// todo try to do something, maybe start a new log file (but not when we're out of disk space)
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo: batch writes when multiple are pending; flush on a timer
|
|
||||||
if err := l.file.Sync(); err != nil {
|
|
||||||
log.Errorw("failed to sync log", "error", err)
|
|
||||||
}
|
|
||||||
case <-d.closing:
|
|
||||||
if err := l.Close(); err != nil {
|
|
||||||
log.Errorw("failed to close log", "error", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Datastore) runLog(l *logfile) {
|
||||||
|
defer close(d.closed)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ent := <-d.log:
|
||||||
|
if err := l.writeEntry(&ent); err != nil {
|
||||||
|
log.Errorw("failed to write log entry", "error", err)
|
||||||
|
// todo try to do something, maybe start a new log file (but not when we're out of disk space)
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo: batch writes when multiple are pending; flush on a timer
|
||||||
|
if err := l.file.Sync(); err != nil {
|
||||||
|
log.Errorw("failed to sync log", "error", err)
|
||||||
|
}
|
||||||
|
case <-d.closing:
|
||||||
|
if err := l.Close(); err != nil {
|
||||||
|
log.Errorw("failed to close log", "error", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type logfile struct {
|
type logfile struct {
|
||||||
file *os.File
|
file *os.File
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user