lotus/journal/fsjournal/fs.go

155 lines
3.1 KiB
Go
Raw Normal View History

2021-08-17 12:55:35 +00:00
package fsjournal
2020-07-17 13:22:37 +00:00
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
2021-08-17 12:55:35 +00:00
logging "github.com/ipfs/go-log/v2"
2020-07-17 13:22:37 +00:00
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
2021-08-17 12:55:35 +00:00
"github.com/filecoin-project/lotus/journal"
2020-07-17 13:22:37 +00:00
"github.com/filecoin-project/lotus/node/repo"
)
2021-08-17 12:55:35 +00:00
var log = logging.Logger("fsjournal")
const RFC3339nocolon = "2006-01-02T150405Z0700"
2020-07-17 13:22:37 +00:00
// fsJournal is a basic journal backed by files on a filesystem.
type fsJournal struct {
2021-08-17 12:55:35 +00:00
journal.EventTypeRegistry
2020-07-17 13:22:37 +00:00
dir string
sizeLimit int64
fi *os.File
fSize int64
2021-08-17 12:55:35 +00:00
incoming chan *journal.Event
2020-07-17 13:22:37 +00:00
closing chan struct{}
2020-08-11 12:48:32 +00:00
closed chan struct{}
2020-07-17 13:22:37 +00:00
}
// OpenFSJournal constructs a rolling filesystem journal, with a default
// per-file size limit of 1GiB.
2021-08-17 12:55:35 +00:00
func OpenFSJournal(lr repo.LockedRepo, disabled journal.DisabledEvents) (journal.Journal, error) {
2020-07-17 13:22:37 +00:00
dir := filepath.Join(lr.Path(), "journal")
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err)
}
f := &fsJournal{
2021-08-17 12:55:35 +00:00
EventTypeRegistry: journal.NewEventTypeRegistry(disabled),
2020-08-26 15:09:37 +00:00
dir: dir,
sizeLimit: 1 << 30,
2021-08-17 12:55:35 +00:00
incoming: make(chan *journal.Event, 32),
2020-08-26 15:09:37 +00:00
closing: make(chan struct{}),
closed: make(chan struct{}),
2020-07-17 13:22:37 +00:00
}
if err := f.rollJournalFile(); err != nil {
return nil, err
}
go f.runLoop()
return f, nil
}
2021-08-17 12:55:35 +00:00
func (f *fsJournal) RecordEvent(evtType journal.EventType, supplier func() interface{}) {
2020-08-26 15:09:37 +00:00
defer func() {
if r := recover(); r != nil {
log.Warnf("recovered from panic while recording journal event; type=%s, err=%v", evtType, r)
}
}()
if !evtType.Enabled() {
return
}
2021-08-17 12:55:35 +00:00
je := &journal.Event{
2020-07-17 13:22:37 +00:00
EventType: evtType,
Timestamp: build.Clock.Now(),
2020-08-26 15:09:37 +00:00
Data: supplier(),
2020-07-17 13:22:37 +00:00
}
select {
case f.incoming <- je:
case <-f.closing:
log.Warnw("journal closed but tried to log event", "event", je)
2020-07-17 13:22:37 +00:00
}
}
func (f *fsJournal) Close() error {
close(f.closing)
2020-08-11 12:48:32 +00:00
<-f.closed
2020-07-17 13:22:37 +00:00
return nil
}
2021-08-17 12:55:35 +00:00
func (f *fsJournal) putEvent(evt *journal.Event) error {
b, err := json.Marshal(evt)
2020-07-17 13:22:37 +00:00
if err != nil {
return err
}
n, err := f.fi.Write(append(b, '\n'))
if err != nil {
return err
}
f.fSize += int64(n)
if f.fSize >= f.sizeLimit {
_ = f.rollJournalFile()
}
return nil
}
func (f *fsJournal) rollJournalFile() error {
if f.fi != nil {
_ = f.fi.Close()
}
current := filepath.Join(f.dir, "lotus-journal.ndjson")
rolled := filepath.Join(f.dir, fmt.Sprintf(
"lotus-journal-%s.ndjson",
build.Clock.Now().Format(RFC3339nocolon),
))
// check if journal file exists
if fi, err := os.Stat(current); err == nil && !fi.IsDir() {
err := os.Rename(current, rolled)
if err != nil {
return xerrors.Errorf("failed to roll journal file: %w", err)
}
}
2020-07-17 13:22:37 +00:00
nfi, err := os.Create(current)
2020-07-17 13:22:37 +00:00
if err != nil {
return xerrors.Errorf("failed to create journal file: %w", err)
2020-07-17 13:22:37 +00:00
}
f.fi = nfi
f.fSize = 0
2020-07-17 13:22:37 +00:00
return nil
}
func (f *fsJournal) runLoop() {
2020-08-11 12:48:32 +00:00
defer close(f.closed)
2020-07-17 13:22:37 +00:00
for {
select {
case je := <-f.incoming:
if err := f.putEvent(je); err != nil {
log.Errorw("failed to write out journal event", "event", je, "err", err)
2020-07-17 13:22:37 +00:00
}
case <-f.closing:
_ = f.fi.Close()
return
}
}
}