5733c71c50
We were ignoring quite a few error cases, and had one case where we weren't actually updating state where we wanted to. Unfortunately, if the linter doesn't pass, nobody has any reason to actually check lint failures in CI. There are three remaining XXXs marked in the code for lint.
153 lines
2.7 KiB
Go
153 lines
2.7 KiB
Go
package journal
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
logging "github.com/ipfs/go-log"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
)
|
|
|
|
func InitializeSystemJournal(dir string) error {
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return err
|
|
}
|
|
j, err := OpenFSJournal(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
currentJournal = j
|
|
return nil
|
|
}
|
|
|
|
func Add(sys string, val interface{}) {
|
|
if currentJournal == nil {
|
|
log.Warn("no journal configured")
|
|
return
|
|
}
|
|
currentJournal.AddEntry(sys, val)
|
|
}
|
|
|
|
var log = logging.Logger("journal")
|
|
|
|
var currentJournal Journal
|
|
|
|
type Journal interface {
|
|
AddEntry(system string, obj interface{})
|
|
Close() error
|
|
}
|
|
|
|
// fsJournal is a basic journal backed by files on a filesystem
|
|
type fsJournal struct {
|
|
fi *os.File
|
|
fSize int64
|
|
|
|
journalDir string
|
|
|
|
incoming chan *JournalEntry
|
|
journalSizeLimit int64
|
|
|
|
closing chan struct{}
|
|
}
|
|
|
|
func OpenFSJournal(dir string) (Journal, error) {
|
|
fsj := &fsJournal{
|
|
journalDir: dir,
|
|
incoming: make(chan *JournalEntry, 32),
|
|
journalSizeLimit: 1 << 30,
|
|
closing: make(chan struct{}),
|
|
}
|
|
|
|
if err := fsj.rollJournalFile(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go fsj.runLoop()
|
|
|
|
return fsj, nil
|
|
}
|
|
|
|
type JournalEntry struct {
|
|
System string
|
|
Timestamp time.Time
|
|
Val interface{}
|
|
}
|
|
|
|
func (fsj *fsJournal) putEntry(je *JournalEntry) error {
|
|
b, err := json.Marshal(je)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n, err := fsj.fi.Write(append(b, '\n'))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fsj.fSize += int64(n)
|
|
|
|
if fsj.fSize >= fsj.journalSizeLimit {
|
|
return fsj.rollJournalFile()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
const RFC3339nocolon = "2006-01-02T150405Z0700"
|
|
|
|
func (fsj *fsJournal) rollJournalFile() error {
|
|
if fsj.fi != nil {
|
|
err := fsj.fi.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(RFC3339nocolon))))
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to open journal file: %w", err)
|
|
}
|
|
|
|
fsj.fi = nfi
|
|
fsj.fSize = 0
|
|
return nil
|
|
}
|
|
|
|
func (fsj *fsJournal) runLoop() {
|
|
for {
|
|
select {
|
|
case je := <-fsj.incoming:
|
|
if err := fsj.putEntry(je); err != nil {
|
|
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
|
|
}
|
|
case <-fsj.closing:
|
|
if err := fsj.fi.Close(); err != nil {
|
|
log.Errorw("failed to close journal", "err", err)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (fsj *fsJournal) AddEntry(system string, obj interface{}) {
|
|
je := &JournalEntry{
|
|
System: system,
|
|
Timestamp: build.Clock.Now(),
|
|
Val: obj,
|
|
}
|
|
select {
|
|
case fsj.incoming <- je:
|
|
case <-fsj.closing:
|
|
log.Warnw("journal closed but tried to log event", "entry", je)
|
|
}
|
|
}
|
|
|
|
func (fsj *fsJournal) Close() error {
|
|
close(fsj.closing)
|
|
return nil
|
|
}
|