Merge pull request #2101 from filecoin-project/feat/persistent-journal
implement a persistent journal for lotus node operations
This commit is contained in:
commit
99b5ec96a2
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/state"
|
"github.com/filecoin-project/lotus/chain/state"
|
||||||
"github.com/filecoin-project/lotus/chain/vm"
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
"go.opencensus.io/stats"
|
"go.opencensus.io/stats"
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
@ -324,6 +325,14 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
journal.Add("sync", map[string]interface{}{
|
||||||
|
"op": "headChange",
|
||||||
|
"from": r.old.Key(),
|
||||||
|
"to": r.new.Key(),
|
||||||
|
"rev": len(revert),
|
||||||
|
"apply": len(apply),
|
||||||
|
})
|
||||||
|
|
||||||
// reverse the apply array
|
// reverse the apply array
|
||||||
for i := len(apply)/2 - 1; i >= 0; i-- {
|
for i := len(apply)/2 - 1; i >= 0; i-- {
|
||||||
opp := len(apply) - 1 - i
|
opp := len(apply) - 1 - i
|
||||||
|
146
journal/journal.go
Normal file
146
journal/journal.go
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
package journal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func InitializeSystemJournal(dir string) error {
|
||||||
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
j, err := OpenFSJournal(dir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
currentJournal = j
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Add(sys string, val interface{}) {
|
||||||
|
if currentJournal == nil {
|
||||||
|
log.Warn("no journal configured")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
currentJournal.AddEntry(sys, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
var log = logging.Logger("journal")
|
||||||
|
|
||||||
|
var currentJournal Journal
|
||||||
|
|
||||||
|
type Journal interface {
|
||||||
|
AddEntry(system string, obj interface{})
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// fsJournal is a basic journal backed by files on a filesystem
|
||||||
|
type fsJournal struct {
|
||||||
|
fi *os.File
|
||||||
|
fSize int64
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
|
journalDir string
|
||||||
|
|
||||||
|
incoming chan *JournalEntry
|
||||||
|
journalSizeLimit int64
|
||||||
|
|
||||||
|
closing chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func OpenFSJournal(dir string) (*fsJournal, error) {
|
||||||
|
fsj := &fsJournal{
|
||||||
|
journalDir: dir,
|
||||||
|
incoming: make(chan *JournalEntry, 32),
|
||||||
|
journalSizeLimit: 1 << 30,
|
||||||
|
closing: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fsj.rollJournalFile(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
go fsj.runLoop()
|
||||||
|
|
||||||
|
return fsj, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type JournalEntry struct {
|
||||||
|
System string
|
||||||
|
Timestamp time.Time
|
||||||
|
Val interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsj *fsJournal) putEntry(je *JournalEntry) error {
|
||||||
|
b, err := json.Marshal(je)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n, err := fsj.fi.Write(append(b, '\n'))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fsj.fSize += int64(n)
|
||||||
|
|
||||||
|
if fsj.fSize >= fsj.journalSizeLimit {
|
||||||
|
fsj.rollJournalFile()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsj *fsJournal) rollJournalFile() error {
|
||||||
|
if fsj.fi != nil {
|
||||||
|
fsj.fi.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339))))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to open journal file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fsj.fi = nfi
|
||||||
|
fsj.fSize = 0
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsj *fsJournal) runLoop() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case je := <-fsj.incoming:
|
||||||
|
if err := fsj.putEntry(je); err != nil {
|
||||||
|
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
|
||||||
|
}
|
||||||
|
case <-fsj.closing:
|
||||||
|
fsj.fi.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsj *fsJournal) AddEntry(system string, obj interface{}) {
|
||||||
|
je := &JournalEntry{
|
||||||
|
System: system,
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Val: obj,
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case fsj.incoming <- je:
|
||||||
|
case <-fsj.closing:
|
||||||
|
log.Warnw("journal closed but tried to log event", "entry", je)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsj *fsJournal) Close() error {
|
||||||
|
close(fsj.closing)
|
||||||
|
return nil
|
||||||
|
}
|
@ -119,6 +119,7 @@ const (
|
|||||||
ExtractApiKey
|
ExtractApiKey
|
||||||
HeadMetricsKey
|
HeadMetricsKey
|
||||||
RunPeerTaggerKey
|
RunPeerTaggerKey
|
||||||
|
JournalKey
|
||||||
|
|
||||||
SetApiEndpointKey
|
SetApiEndpointKey
|
||||||
|
|
||||||
@ -150,6 +151,7 @@ func defaults() []Option {
|
|||||||
Override(new(record.Validator), modules.RecordValidator),
|
Override(new(record.Validator), modules.RecordValidator),
|
||||||
Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)),
|
Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)),
|
||||||
Override(new(dtypes.ShutdownChan), make(chan struct{})),
|
Override(new(dtypes.ShutdownChan), make(chan struct{})),
|
||||||
|
Override(JournalKey, modules.SetupJournal),
|
||||||
|
|
||||||
// Filecoin modules
|
// Filecoin modules
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/gbrlsnchs/jwt/v3"
|
"github.com/gbrlsnchs/jwt/v3"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
@ -18,6 +19,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/api/apistruct"
|
"github.com/filecoin-project/lotus/api/apistruct"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
"github.com/filecoin-project/lotus/lib/addrutil"
|
"github.com/filecoin-project/lotus/lib/addrutil"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
@ -93,3 +95,7 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) {
|
|||||||
func DrandBootstrap() (dtypes.DrandBootstrap, error) {
|
func DrandBootstrap() (dtypes.DrandBootstrap, error) {
|
||||||
return build.DrandBootstrap()
|
return build.DrandBootstrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SetupJournal(lr repo.LockedRepo) error {
|
||||||
|
return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal"))
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user