lotus/journal/filesystem.go

132 lines
2.5 KiB
Go
Raw Normal View History

2020-07-17 13:22:37 +00:00
package journal
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
logging "github.com/ipfs/go-log"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/repo"
)
var log = logging.Logger("journal")
// fsJournal is a basic journal backed by files on a filesystem.
type fsJournal struct {
2020-07-17 17:35:15 +00:00
*eventTypeFactory
2020-07-17 13:22:37 +00:00
dir string
sizeLimit int64
lk sync.Mutex
fi *os.File
fSize int64
incoming chan *Entry
closing chan struct{}
}
// OpenFSJournal constructs a rolling filesystem journal, with a default
// per-file size limit of 1GiB.
2020-07-20 13:45:17 +00:00
func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled DisabledEvents) (Journal, error) {
2020-07-17 13:22:37 +00:00
dir := filepath.Join(lr.Path(), "journal")
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err)
}
f := &fsJournal{
2020-07-17 17:35:15 +00:00
eventTypeFactory: newEventTypeFactory(disabled),
dir: dir,
sizeLimit: 1 << 30,
incoming: make(chan *Entry, 32),
closing: make(chan struct{}),
2020-07-17 13:22:37 +00:00
}
if err := f.rollJournalFile(); err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error { return f.Close() },
})
go f.runLoop()
return f, nil
}
func (f *fsJournal) AddEntry(evtType EventType, obj interface{}) {
je := &Entry{
EventType: evtType,
Timestamp: build.Clock.Now(),
Data: obj,
}
select {
case f.incoming <- je:
case <-f.closing:
log.Warnw("journal closed but tried to log event", "entry", je)
}
}
func (f *fsJournal) Close() error {
close(f.closing)
return nil
}
func (f *fsJournal) putEntry(je *Entry) error {
b, err := json.Marshal(je)
if err != nil {
return err
}
n, err := f.fi.Write(append(b, '\n'))
if err != nil {
return err
}
f.fSize += int64(n)
if f.fSize >= f.sizeLimit {
_ = f.rollJournalFile()
}
return nil
}
func (f *fsJournal) rollJournalFile() error {
if f.fi != nil {
_ = f.fi.Close()
}
nfi, err := os.Create(filepath.Join(f.dir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(time.RFC3339))))
if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err)
}
f.fi = nfi
f.fSize = 0
return nil
}
func (f *fsJournal) runLoop() {
for {
select {
case je := <-f.incoming:
if err := f.putEntry(je); err != nil {
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
}
case <-f.closing:
_ = f.fi.Close()
return
}
}
}