untangle fsjournal dependencies
This commit is contained in:
parent
5366821144
commit
b094e0913d
@ -51,6 +51,7 @@ import (
|
|||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||||
"github.com/filecoin-project/lotus/genesis"
|
"github.com/filecoin-project/lotus/genesis"
|
||||||
"github.com/filecoin-project/lotus/journal"
|
"github.com/filecoin-project/lotus/journal"
|
||||||
|
"github.com/filecoin-project/lotus/journal/fsjournal"
|
||||||
storageminer "github.com/filecoin-project/lotus/miner"
|
storageminer "github.com/filecoin-project/lotus/miner"
|
||||||
"github.com/filecoin-project/lotus/node/modules"
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
@ -479,7 +480,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents())
|
j, err := fsjournal.OpenFSJournal(lr, journal.EnvDisabledEvents())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open filesystem journal: %w", err)
|
return fmt.Errorf("failed to open filesystem journal: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,7 @@ import (
|
|||||||
lcli "github.com/filecoin-project/lotus/cli"
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
"github.com/filecoin-project/lotus/journal"
|
"github.com/filecoin-project/lotus/journal"
|
||||||
|
"github.com/filecoin-project/lotus/journal/fsjournal"
|
||||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||||
"github.com/filecoin-project/lotus/lib/ulimit"
|
"github.com/filecoin-project/lotus/lib/ulimit"
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
@ -476,7 +477,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents())
|
j, err := fsjournal.OpenFSJournal(lr, journal.EnvDisabledEvents())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to open journal: %w", err)
|
return xerrors.Errorf("failed to open journal: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package journal
|
package fsjournal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@ -6,17 +6,21 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("fsjournal")
|
||||||
|
|
||||||
const RFC3339nocolon = "2006-01-02T150405Z0700"
|
const RFC3339nocolon = "2006-01-02T150405Z0700"
|
||||||
|
|
||||||
// fsJournal is a basic journal backed by files on a filesystem.
|
// fsJournal is a basic journal backed by files on a filesystem.
|
||||||
type fsJournal struct {
|
type fsJournal struct {
|
||||||
EventTypeRegistry
|
journal.EventTypeRegistry
|
||||||
|
|
||||||
dir string
|
dir string
|
||||||
sizeLimit int64
|
sizeLimit int64
|
||||||
@ -24,7 +28,7 @@ type fsJournal struct {
|
|||||||
fi *os.File
|
fi *os.File
|
||||||
fSize int64
|
fSize int64
|
||||||
|
|
||||||
incoming chan *Event
|
incoming chan *journal.Event
|
||||||
|
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
@ -32,17 +36,17 @@ type fsJournal struct {
|
|||||||
|
|
||||||
// OpenFSJournal constructs a rolling filesystem journal, with a default
|
// OpenFSJournal constructs a rolling filesystem journal, with a default
|
||||||
// per-file size limit of 1GiB.
|
// per-file size limit of 1GiB.
|
||||||
func OpenFSJournal(lr repo.LockedRepo, disabled DisabledEvents) (Journal, error) {
|
func OpenFSJournal(lr repo.LockedRepo, disabled journal.DisabledEvents) (journal.Journal, error) {
|
||||||
dir := filepath.Join(lr.Path(), "journal")
|
dir := filepath.Join(lr.Path(), "journal")
|
||||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||||
return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err)
|
return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
f := &fsJournal{
|
f := &fsJournal{
|
||||||
EventTypeRegistry: NewEventTypeRegistry(disabled),
|
EventTypeRegistry: journal.NewEventTypeRegistry(disabled),
|
||||||
dir: dir,
|
dir: dir,
|
||||||
sizeLimit: 1 << 30,
|
sizeLimit: 1 << 30,
|
||||||
incoming: make(chan *Event, 32),
|
incoming: make(chan *journal.Event, 32),
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
}
|
}
|
||||||
@ -56,7 +60,7 @@ func OpenFSJournal(lr repo.LockedRepo, disabled DisabledEvents) (Journal, error)
|
|||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fsJournal) RecordEvent(evtType EventType, supplier func() interface{}) {
|
func (f *fsJournal) RecordEvent(evtType journal.EventType, supplier func() interface{}) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Warnf("recovered from panic while recording journal event; type=%s, err=%v", evtType, r)
|
log.Warnf("recovered from panic while recording journal event; type=%s, err=%v", evtType, r)
|
||||||
@ -67,7 +71,7 @@ func (f *fsJournal) RecordEvent(evtType EventType, supplier func() interface{})
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
je := &Event{
|
je := &journal.Event{
|
||||||
EventType: evtType,
|
EventType: evtType,
|
||||||
Timestamp: build.Clock.Now(),
|
Timestamp: build.Clock.Now(),
|
||||||
Data: supplier(),
|
Data: supplier(),
|
||||||
@ -85,7 +89,7 @@ func (f *fsJournal) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fsJournal) putEvent(evt *Event) error {
|
func (f *fsJournal) putEvent(evt *journal.Event) error {
|
||||||
b, err := json.Marshal(evt)
|
b, err := json.Marshal(evt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
@ -30,6 +30,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/sub"
|
"github.com/filecoin-project/lotus/chain/sub"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/journal"
|
"github.com/filecoin-project/lotus/journal"
|
||||||
|
"github.com/filecoin-project/lotus/journal/fsjournal"
|
||||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||||
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
||||||
"github.com/filecoin-project/lotus/node/hello"
|
"github.com/filecoin-project/lotus/node/hello"
|
||||||
@ -237,7 +238,7 @@ func RandomSchedule(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Sche
|
|||||||
}
|
}
|
||||||
|
|
||||||
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
|
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
|
||||||
jrnl, err := journal.OpenFSJournal(lr, disabled)
|
jrnl, err := fsjournal.OpenFSJournal(lr, disabled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user