diff --git a/chain/store/store.go b/chain/store/store.go index 4dabb96f7..f2ebf2b41 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/metrics" "go.opencensus.io/stats" "go.opencensus.io/trace" @@ -324,6 +325,14 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo continue } + journal.Add("sync", map[string]interface{}{ + "op": "headChange", + "from": r.old.Key(), + "to": r.new.Key(), + "rev": len(revert), + "apply": len(apply), + }) + // reverse the apply array for i := len(apply)/2 - 1; i >= 0; i-- { opp := len(apply) - 1 - i diff --git a/journal/journal.go b/journal/journal.go new file mode 100644 index 000000000..b664e8fa7 --- /dev/null +++ b/journal/journal.go @@ -0,0 +1,146 @@ +package journal + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + logging "github.com/ipfs/go-log" + "golang.org/x/xerrors" +) + +func InitializeSystemJournal(dir string) error { + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + j, err := OpenFSJournal(dir) + if err != nil { + return err + } + currentJournal = j + return nil +} + +func Add(sys string, val interface{}) { + if currentJournal == nil { + log.Warn("no journal configured") + return + } + currentJournal.AddEntry(sys, val) +} + +var log = logging.Logger("journal") + +var currentJournal Journal + +type Journal interface { + AddEntry(system string, obj interface{}) + Close() error +} + +// fsJournal is a basic journal backed by files on a filesystem +type fsJournal struct { + fi *os.File + fSize int64 + + lk sync.Mutex + + journalDir string + + incoming chan *JournalEntry + journalSizeLimit int64 + + closing chan struct{} +} + +func OpenFSJournal(dir string) (*fsJournal, error) { + fsj := &fsJournal{ + journalDir: dir, + incoming: make(chan *JournalEntry, 32), + journalSizeLimit: 1 << 30, + closing: make(chan struct{}), + } + + if err := fsj.rollJournalFile(); err != nil { + return nil, err + } + + go fsj.runLoop() + + return fsj, nil +} + +type JournalEntry struct { + System string + Timestamp time.Time + Val interface{} +} + +func (fsj *fsJournal) putEntry(je *JournalEntry) error { + b, err := json.Marshal(je) + if err != nil { + return err + } + n, err := fsj.fi.Write(append(b, '\n')) + if err != nil { + return err + } + + fsj.fSize += int64(n) + + if fsj.fSize >= fsj.journalSizeLimit { + fsj.rollJournalFile() + } + + return nil +} + +func (fsj *fsJournal) rollJournalFile() error { + if fsj.fi != nil { + fsj.fi.Close() + } + + nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339)))) + if err != nil { + return xerrors.Errorf("failed to open journal file: %w", err) + } + + fsj.fi = nfi + fsj.fSize = 0 + return nil +} + +func (fsj *fsJournal) runLoop() { + for { + select { + case je := <-fsj.incoming: + if err := fsj.putEntry(je); err != nil { + log.Errorw("failed to write out journal entry", "entry", je, "err", err) + } + case <-fsj.closing: + fsj.fi.Close() + return + } + } +} + +func (fsj *fsJournal) AddEntry(system string, obj interface{}) { + je := &JournalEntry{ + System: system, + Timestamp: time.Now(), + Val: obj, + } + select { + case fsj.incoming <- je: + case <-fsj.closing: + log.Warnw("journal closed but tried to log event", "entry", je) + } +} + +func (fsj *fsJournal) Close() error { + close(fsj.closing) + return nil +} diff --git a/node/builder.go b/node/builder.go index 2ffe88921..5246028e7 100644 --- a/node/builder.go +++ b/node/builder.go @@ -119,6 +119,7 @@ const ( ExtractApiKey HeadMetricsKey RunPeerTaggerKey + JournalKey SetApiEndpointKey @@ -150,6 +151,7 @@ func defaults() []Option { Override(new(record.Validator), modules.RecordValidator), Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)), Override(new(dtypes.ShutdownChan), make(chan struct{})), + Override(JournalKey, modules.SetupJournal), // Filecoin modules diff --git a/node/modules/core.go b/node/modules/core.go index ca9872d90..84179bd63 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -6,6 +6,7 @@ import ( "errors" "io" "io/ioutil" + "path/filepath" "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log/v2" @@ -18,6 +19,7 @@ import ( "github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/addrutil" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" @@ -93,3 +95,7 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) { func DrandBootstrap() (dtypes.DrandBootstrap, error) { return build.DrandBootstrap() } + +func SetupJournal(lr repo.LockedRepo) error { + return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal")) +}