Merge pull request #4275 from filecoin-project/steb/unshare-journal
unshare the journal
This commit is contained in:
commit
d4c92942e7
@ -39,6 +39,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
|
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
"github.com/filecoin-project/lotus/genesis"
|
"github.com/filecoin-project/lotus/genesis"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||||
"github.com/filecoin-project/lotus/lib/sigs"
|
"github.com/filecoin-project/lotus/lib/sigs"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
@ -122,6 +123,7 @@ var DefaultRemainderAccountActor = genesis.Actor{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
|
func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
|
||||||
|
j := journal.NilJournal()
|
||||||
// TODO: we really shouldn't modify a global variable here.
|
// TODO: we really shouldn't modify a global variable here.
|
||||||
policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
|
policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
|
||||||
|
|
||||||
@ -229,12 +231,12 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
|
|||||||
Timestamp: uint64(build.Clock.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()),
|
Timestamp: uint64(build.Clock.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()),
|
||||||
}
|
}
|
||||||
|
|
||||||
genb, err := genesis2.MakeGenesisBlock(context.TODO(), bs, sys, tpl)
|
genb, err := genesis2.MakeGenesisBlock(context.TODO(), j, bs, sys, tpl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("make genesis block failed: %w", err)
|
return nil, xerrors.Errorf("make genesis block failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := store.NewChainStore(bs, ds, sys)
|
cs := store.NewChainStore(bs, ds, sys, j)
|
||||||
|
|
||||||
genfb := &types.FullBlock{Header: genb.Genesis}
|
genfb := &types.FullBlock{Header: genb.Genesis}
|
||||||
gents := store.NewFullTipSet([]*types.FullBlock{genfb})
|
gents := store.NewFullTipSet([]*types.FullBlock{genfb})
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
@ -466,7 +467,10 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot ci
|
|||||||
return st, nil
|
return st, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func MakeGenesisBlock(ctx context.Context, bs bstore.Blockstore, sys vm.SyscallBuilder, template genesis.Template) (*GenesisBootstrap, error) {
|
func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blockstore, sys vm.SyscallBuilder, template genesis.Template) (*GenesisBootstrap, error) {
|
||||||
|
if j == nil {
|
||||||
|
j = journal.NilJournal()
|
||||||
|
}
|
||||||
st, keyIDs, err := MakeInitialStateTree(ctx, bs, template)
|
st, keyIDs, err := MakeInitialStateTree(ctx, bs, template)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("make initial state tree failed: %w", err)
|
return nil, xerrors.Errorf("make initial state tree failed: %w", err)
|
||||||
@ -478,7 +482,7 @@ func MakeGenesisBlock(ctx context.Context, bs bstore.Blockstore, sys vm.SyscallB
|
|||||||
}
|
}
|
||||||
|
|
||||||
// temp chainstore
|
// temp chainstore
|
||||||
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys)
|
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys, j)
|
||||||
|
|
||||||
// Verify PreSealed Data
|
// Verify PreSealed Data
|
||||||
stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs)
|
stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs)
|
||||||
|
@ -159,6 +159,7 @@ type MessagePool struct {
|
|||||||
sigValCache *lru.TwoQueueCache
|
sigValCache *lru.TwoQueueCache
|
||||||
|
|
||||||
evtTypes [3]journal.EventType
|
evtTypes [3]journal.EventType
|
||||||
|
journal journal.Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
type msgSet struct {
|
type msgSet struct {
|
||||||
@ -316,7 +317,7 @@ func (ms *msgSet) getRequiredFunds(nonce uint64) types.BigInt {
|
|||||||
return types.BigInt{Int: requiredFunds}
|
return types.BigInt{Int: requiredFunds}
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||||
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
||||||
|
|
||||||
@ -325,6 +326,10 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
|
|||||||
return nil, xerrors.Errorf("error loading mpool config: %w", err)
|
return nil, xerrors.Errorf("error loading mpool config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if j == nil {
|
||||||
|
j = journal.NilJournal()
|
||||||
|
}
|
||||||
|
|
||||||
mp := &MessagePool{
|
mp := &MessagePool{
|
||||||
ds: ds,
|
ds: ds,
|
||||||
addSema: make(chan struct{}, 1),
|
addSema: make(chan struct{}, 1),
|
||||||
@ -344,10 +349,11 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
|
|||||||
netName: netName,
|
netName: netName,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
evtTypes: [...]journal.EventType{
|
evtTypes: [...]journal.EventType{
|
||||||
evtTypeMpoolAdd: journal.J.RegisterEventType("mpool", "add"),
|
evtTypeMpoolAdd: j.RegisterEventType("mpool", "add"),
|
||||||
evtTypeMpoolRemove: journal.J.RegisterEventType("mpool", "remove"),
|
evtTypeMpoolRemove: j.RegisterEventType("mpool", "remove"),
|
||||||
evtTypeMpoolRepub: journal.J.RegisterEventType("mpool", "repub"),
|
evtTypeMpoolRepub: j.RegisterEventType("mpool", "repub"),
|
||||||
},
|
},
|
||||||
|
journal: j,
|
||||||
}
|
}
|
||||||
|
|
||||||
// enable initial prunes
|
// enable initial prunes
|
||||||
@ -744,7 +750,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)
|
|||||||
Message: m,
|
Message: m,
|
||||||
}, localUpdates)
|
}, localUpdates)
|
||||||
|
|
||||||
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
|
mp.journal.RecordEvent(mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
|
||||||
return MessagePoolEvt{
|
return MessagePoolEvt{
|
||||||
Action: "add",
|
Action: "add",
|
||||||
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}},
|
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}},
|
||||||
@ -865,7 +871,7 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool)
|
|||||||
Message: m,
|
Message: m,
|
||||||
}, localUpdates)
|
}, localUpdates)
|
||||||
|
|
||||||
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
|
mp.journal.RecordEvent(mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
|
||||||
return MessagePoolEvt{
|
return MessagePoolEvt{
|
||||||
Action: "remove",
|
Action: "remove",
|
||||||
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}}}
|
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}}}
|
||||||
|
@ -225,7 +225,7 @@ func TestMessagePool(t *testing.T) {
|
|||||||
|
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
mp, err := New(tma, ds, "mptest")
|
mp, err := New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -266,7 +266,7 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) {
|
|||||||
|
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
mp, err := New(tma, ds, "mptest")
|
mp, err := New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -315,7 +315,7 @@ func TestRevertMessages(t *testing.T) {
|
|||||||
|
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
mp, err := New(tma, ds, "mptest")
|
mp, err := New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -378,7 +378,7 @@ func TestPruningSimple(t *testing.T) {
|
|||||||
|
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
mp, err := New(tma, ds, "mptest")
|
mp, err := New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -422,7 +422,7 @@ func TestLoadLocal(t *testing.T) {
|
|||||||
tma := newTestMpoolAPI()
|
tma := newTestMpoolAPI()
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
mp, err := New(tma, ds, "mptest")
|
mp, err := New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -465,7 +465,7 @@ func TestLoadLocal(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mp, err = New(tma, ds, "mptest")
|
mp, err = New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -494,7 +494,7 @@ func TestClearAll(t *testing.T) {
|
|||||||
tma := newTestMpoolAPI()
|
tma := newTestMpoolAPI()
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
mp, err := New(tma, ds, "mptest")
|
mp, err := New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -548,7 +548,7 @@ func TestClearNonLocal(t *testing.T) {
|
|||||||
tma := newTestMpoolAPI()
|
tma := newTestMpoolAPI()
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
mp, err := New(tma, ds, "mptest")
|
mp, err := New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -609,7 +609,7 @@ func TestUpdates(t *testing.T) {
|
|||||||
tma := newTestMpoolAPI()
|
tma := newTestMpoolAPI()
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
mp, err := New(tma, ds, "mptest")
|
mp, err := New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
|
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/journal"
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -148,7 +147,7 @@ loop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(msgs) > 0 {
|
if len(msgs) > 0 {
|
||||||
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
|
mp.journal.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
|
||||||
msgsEv := make([]MessagePoolEvtMessage, 0, len(msgs))
|
msgsEv := make([]MessagePoolEvtMessage, 0, len(msgs))
|
||||||
for _, m := range msgs {
|
for _, m := range msgs {
|
||||||
msgsEv = append(msgsEv, MessagePoolEvtMessage{Message: m.Message, CID: m.Cid()})
|
msgsEv = append(msgsEv, MessagePoolEvtMessage{Message: m.Message, CID: m.Cid()})
|
||||||
|
@ -21,7 +21,7 @@ func TestRepubMessages(t *testing.T) {
|
|||||||
tma := newTestMpoolAPI()
|
tma := newTestMpoolAPI()
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
mp, err := New(tma, ds, "mptest")
|
mp, err := New(tma, ds, "mptest", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,7 @@ func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, g
|
|||||||
func makeTestMpool() (*MessagePool, *testMpoolAPI) {
|
func makeTestMpool() (*MessagePool, *testMpoolAPI) {
|
||||||
tma := newTestMpoolAPI()
|
tma := newTestMpoolAPI()
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
mp, err := New(tma, ds, "test")
|
mp, err := New(tma, ds, "test", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ func TestIndexSeeks(t *testing.T) {
|
|||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
|
|
||||||
nbs := blockstore.NewTemporarySync()
|
nbs := blockstore.NewTemporarySync()
|
||||||
cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil)
|
cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
|
||||||
|
|
||||||
_, err = cs.Import(bytes.NewReader(gencar))
|
_, err = cs.Import(bytes.NewReader(gencar))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -124,11 +124,15 @@ type ChainStore struct {
|
|||||||
vmcalls vm.SyscallBuilder
|
vmcalls vm.SyscallBuilder
|
||||||
|
|
||||||
evtTypes [1]journal.EventType
|
evtTypes [1]journal.EventType
|
||||||
|
journal journal.Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore {
|
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
|
||||||
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
|
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
|
||||||
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
|
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
|
||||||
|
if j == nil {
|
||||||
|
j = journal.NilJournal()
|
||||||
|
}
|
||||||
cs := &ChainStore{
|
cs := &ChainStore{
|
||||||
bs: bs,
|
bs: bs,
|
||||||
ds: ds,
|
ds: ds,
|
||||||
@ -137,10 +141,11 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB
|
|||||||
mmCache: c,
|
mmCache: c,
|
||||||
tsCache: tsc,
|
tsCache: tsc,
|
||||||
vmcalls: vmcalls,
|
vmcalls: vmcalls,
|
||||||
|
journal: j,
|
||||||
}
|
}
|
||||||
|
|
||||||
cs.evtTypes = [1]journal.EventType{
|
cs.evtTypes = [1]journal.EventType{
|
||||||
evtTypeHeadChange: journal.J.RegisterEventType("sync", "head_change"),
|
evtTypeHeadChange: j.RegisterEventType("sync", "head_change"),
|
||||||
}
|
}
|
||||||
|
|
||||||
ci := NewChainIndex(cs.LoadTipSet)
|
ci := NewChainIndex(cs.LoadTipSet)
|
||||||
@ -379,7 +384,7 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
journal.J.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} {
|
cs.journal.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} {
|
||||||
return HeadChangeEvt{
|
return HeadChangeEvt{
|
||||||
From: r.old.Key(),
|
From: r.old.Key(),
|
||||||
FromHeight: r.old.Height(),
|
FromHeight: r.old.Height(),
|
||||||
|
@ -62,7 +62,7 @@ func BenchmarkGetRandomness(b *testing.B) {
|
|||||||
|
|
||||||
bs := blockstore.NewBlockstore(bds)
|
bs := blockstore.NewBlockstore(bds)
|
||||||
|
|
||||||
cs := store.NewChainStore(bs, mds, nil)
|
cs := store.NewChainStore(bs, mds, nil, nil)
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
@ -96,7 +96,7 @@ func TestChainExportImport(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
nbs := blockstore.NewTemporary()
|
nbs := blockstore.NewTemporary()
|
||||||
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil)
|
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil)
|
||||||
|
|
||||||
root, err := cs.Import(buf)
|
root, err := cs.Import(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -183,7 +183,7 @@ var importBenchCmd = &cli.Command{
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := store.NewChainStore(bs, ds, vm.Syscalls(verifier))
|
cs := store.NewChainStore(bs, ds, vm.Syscalls(verifier), nil)
|
||||||
stm := stmgr.NewStateManager(cs)
|
stm := stmgr.NewStateManager(cs)
|
||||||
|
|
||||||
if cctx.Bool("global-profile") {
|
if cctx.Bool("global-profile") {
|
||||||
|
@ -173,7 +173,7 @@ var chainBalanceStateCmd = &cli.Command{
|
|||||||
|
|
||||||
bs := blockstore.NewBlockstore(ds)
|
bs := blockstore.NewBlockstore(ds)
|
||||||
|
|
||||||
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
|
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||||
|
|
||||||
cst := cbor.NewCborStore(bs)
|
cst := cbor.NewCborStore(bs)
|
||||||
store := adt.WrapStore(ctx, cst)
|
store := adt.WrapStore(ctx, cst)
|
||||||
@ -343,7 +343,7 @@ var chainPledgeCmd = &cli.Command{
|
|||||||
|
|
||||||
bs := blockstore.NewBlockstore(ds)
|
bs := blockstore.NewBlockstore(ds)
|
||||||
|
|
||||||
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
|
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||||
|
|
||||||
cst := cbor.NewCborStore(bs)
|
cst := cbor.NewCborStore(bs)
|
||||||
store := adt.WrapStore(ctx, cst)
|
store := adt.WrapStore(ctx, cst)
|
||||||
|
@ -83,7 +83,7 @@ var exportChainCmd = &cli.Command{
|
|||||||
|
|
||||||
bs := blockstore.NewBlockstore(ds)
|
bs := blockstore.NewBlockstore(ds)
|
||||||
|
|
||||||
cs := store.NewChainStore(bs, mds, nil)
|
cs := store.NewChainStore(bs, mds, nil, nil)
|
||||||
if err := cs.Load(); err != nil {
|
if err := cs.Load(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,7 @@ var genesisVerifyCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
|
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
|
||||||
|
|
||||||
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), nil)
|
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), nil, nil)
|
||||||
|
|
||||||
cf := cctx.Args().Get(0)
|
cf := cctx.Args().Get(0)
|
||||||
f, err := os.Open(cf)
|
f, err := os.Open(cf)
|
||||||
|
@ -162,7 +162,7 @@ var stateTreePruneCmd = &cli.Command{
|
|||||||
|
|
||||||
bs := blockstore.NewBlockstore(ds)
|
bs := blockstore.NewBlockstore(ds)
|
||||||
|
|
||||||
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
|
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||||
if err := cs.Load(); err != nil {
|
if err := cs.Load(); err != nil {
|
||||||
return fmt.Errorf("loading chainstore: %w", err)
|
return fmt.Errorf("loading chainstore: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -464,13 +464,12 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if jrnl, err := journal.OpenFSJournal(lr, journal.DefaultDisabledEvents); err == nil {
|
j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents())
|
||||||
journal.J = jrnl
|
if err != nil {
|
||||||
} else {
|
|
||||||
return fmt.Errorf("failed to open filesystem journal: %w", err)
|
return fmt.Errorf("failed to open filesystem journal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m := miner.NewMiner(api, epp, a, slashfilter.New(mds))
|
m := miner.NewMiner(api, epp, a, slashfilter.New(mds), j)
|
||||||
{
|
{
|
||||||
if err := m.Start(ctx); err != nil {
|
if err := m.Start(ctx); err != nil {
|
||||||
return xerrors.Errorf("failed to start up genesis miner: %w", err)
|
return xerrors.Errorf("failed to start up genesis miner: %w", err)
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/vm"
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
lcli "github.com/filecoin-project/lotus/cli"
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||||
"github.com/filecoin-project/lotus/lib/ulimit"
|
"github.com/filecoin-project/lotus/lib/ulimit"
|
||||||
@ -410,7 +411,11 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
|||||||
|
|
||||||
bs := blockstore.NewBlockstore(ds)
|
bs := blockstore.NewBlockstore(ds)
|
||||||
|
|
||||||
cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
|
j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to open journal: %w", err)
|
||||||
|
}
|
||||||
|
cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
|
||||||
|
|
||||||
log.Infof("importing chain from %s...", fname)
|
log.Infof("importing chain from %s...", fname)
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot
|
|||||||
syscalls = vm.Syscalls(ffiwrapper.ProofVerifier)
|
syscalls = vm.Syscalls(ffiwrapper.ProofVerifier)
|
||||||
vmRand = NewFixedRand()
|
vmRand = NewFixedRand()
|
||||||
|
|
||||||
cs = store.NewChainStore(bs, ds, syscalls)
|
cs = store.NewChainStore(bs, ds, syscalls, nil)
|
||||||
sm = stmgr.NewStateManager(cs)
|
sm = stmgr.NewStateManager(cs)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
19
journal/env.go
Normal file
19
journal/env.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package journal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
// envJournalDisabledEvents is the environment variable through which disabled
|
||||||
|
// journal events can be customized.
|
||||||
|
const envDisabledEvents = "LOTUS_JOURNAL_DISABLED_EVENTS"
|
||||||
|
|
||||||
|
func EnvDisabledEvents() DisabledEvents {
|
||||||
|
if env, ok := os.LookupEnv(envDisabledEvents); ok {
|
||||||
|
if ret, err := ParseDisabledEvents(env); err == nil {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// fallback if env variable is not set, or if it failed to parse.
|
||||||
|
return DefaultDisabledEvents
|
||||||
|
}
|
@ -1,9 +0,0 @@
|
|||||||
package journal
|
|
||||||
|
|
||||||
var (
|
|
||||||
// J is a globally accessible Journal. It starts being NilJournal, and early
|
|
||||||
// during the Lotus initialization routine, it is reset to whichever Journal
|
|
||||||
// is configured (by default, the filesystem journal). Components can safely
|
|
||||||
// record in the journal by calling: journal.J.RecordEvent(...).
|
|
||||||
J Journal = NilJournal() // nolint
|
|
||||||
)
|
|
@ -28,9 +28,9 @@ type RetrievalProviderEvt struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// StorageClientJournaler records journal events from the storage client.
|
// StorageClientJournaler records journal events from the storage client.
|
||||||
func StorageClientJournaler(evtType journal.EventType) func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
|
func StorageClientJournaler(j journal.Journal, evtType journal.EventType) func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
|
||||||
return func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
|
return func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
|
||||||
journal.J.RecordEvent(evtType, func() interface{} {
|
j.RecordEvent(evtType, func() interface{} {
|
||||||
return StorageClientEvt{
|
return StorageClientEvt{
|
||||||
Event: storagemarket.ClientEvents[event],
|
Event: storagemarket.ClientEvents[event],
|
||||||
Deal: deal,
|
Deal: deal,
|
||||||
@ -40,9 +40,9 @@ func StorageClientJournaler(evtType journal.EventType) func(event storagemarket.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// StorageProviderJournaler records journal events from the storage provider.
|
// StorageProviderJournaler records journal events from the storage provider.
|
||||||
func StorageProviderJournaler(evtType journal.EventType) func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
func StorageProviderJournaler(j journal.Journal, evtType journal.EventType) func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
||||||
return func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
return func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
||||||
journal.J.RecordEvent(evtType, func() interface{} {
|
j.RecordEvent(evtType, func() interface{} {
|
||||||
return StorageProviderEvt{
|
return StorageProviderEvt{
|
||||||
Event: storagemarket.ProviderEvents[event],
|
Event: storagemarket.ProviderEvents[event],
|
||||||
Deal: deal,
|
Deal: deal,
|
||||||
@ -52,9 +52,9 @@ func StorageProviderJournaler(evtType journal.EventType) func(event storagemarke
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrievalClientJournaler records journal events from the retrieval client.
|
// RetrievalClientJournaler records journal events from the retrieval client.
|
||||||
func RetrievalClientJournaler(evtType journal.EventType) func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
|
func RetrievalClientJournaler(j journal.Journal, evtType journal.EventType) func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
|
||||||
return func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
|
return func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
|
||||||
journal.J.RecordEvent(evtType, func() interface{} {
|
j.RecordEvent(evtType, func() interface{} {
|
||||||
return RetrievalClientEvt{
|
return RetrievalClientEvt{
|
||||||
Event: retrievalmarket.ClientEvents[event],
|
Event: retrievalmarket.ClientEvents[event],
|
||||||
Deal: deal,
|
Deal: deal,
|
||||||
@ -64,9 +64,9 @@ func RetrievalClientJournaler(evtType journal.EventType) func(event retrievalmar
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrievalProviderJournaler records journal events from the retrieval provider.
|
// RetrievalProviderJournaler records journal events from the retrieval provider.
|
||||||
func RetrievalProviderJournaler(evtType journal.EventType) func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
|
func RetrievalProviderJournaler(j journal.Journal, evtType journal.EventType) func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
|
||||||
return func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
|
return func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
|
||||||
journal.J.RecordEvent(evtType, func() interface{} {
|
j.RecordEvent(evtType, func() interface{} {
|
||||||
return RetrievalProviderEvt{
|
return RetrievalProviderEvt{
|
||||||
Event: retrievalmarket.ProviderEvents[event],
|
Event: retrievalmarket.ProviderEvents[event],
|
||||||
Deal: deal,
|
Deal: deal,
|
||||||
|
@ -49,7 +49,7 @@ func randTimeOffset(width time.Duration) time.Duration {
|
|||||||
return val - (width / 2)
|
return val - (width / 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter) *Miner {
|
func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter, j journal.Journal) *Miner {
|
||||||
arc, err := lru.NewARC(10000)
|
arc, err := lru.NewARC(10000)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -74,8 +74,9 @@ func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address,
|
|||||||
sf: sf,
|
sf: sf,
|
||||||
minedBlockHeights: arc,
|
minedBlockHeights: arc,
|
||||||
evtTypes: [...]journal.EventType{
|
evtTypes: [...]journal.EventType{
|
||||||
evtTypeBlockMined: journal.J.RegisterEventType("miner", "block_mined"),
|
evtTypeBlockMined: j.RegisterEventType("miner", "block_mined"),
|
||||||
},
|
},
|
||||||
|
journal: j,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,6 +98,7 @@ type Miner struct {
|
|||||||
minedBlockHeights *lru.ARCCache
|
minedBlockHeights *lru.ARCCache
|
||||||
|
|
||||||
evtTypes [1]journal.EventType
|
evtTypes [1]journal.EventType
|
||||||
|
journal journal.Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) Address() address.Address {
|
func (m *Miner) Address() address.Address {
|
||||||
@ -239,7 +241,7 @@ minerLoop:
|
|||||||
onDone(b != nil, h, nil)
|
onDone(b != nil, h, nil)
|
||||||
|
|
||||||
if b != nil {
|
if b != nil {
|
||||||
journal.J.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} {
|
m.journal.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} {
|
||||||
return map[string]interface{}{
|
return map[string]interface{}{
|
||||||
"parents": base.TipSet.Cids(),
|
"parents": base.TipSet.Cids(),
|
||||||
"nulls": base.NullRounds,
|
"nulls": base.NullRounds,
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/gen"
|
"github.com/filecoin-project/lotus/chain/gen"
|
||||||
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MineReq struct {
|
type MineReq struct {
|
||||||
@ -32,6 +33,7 @@ func NewTestMiner(nextCh <-chan MineReq, addr address.Address) func(api.FullNode
|
|||||||
minedBlockHeights: arc,
|
minedBlockHeights: arc,
|
||||||
address: addr,
|
address: addr,
|
||||||
sf: slashfilter.New(ds.NewMapDatastore()),
|
sf: slashfilter.New(ds.NewMapDatastore()),
|
||||||
|
journal: journal.NilJournal(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.Start(context.TODO()); err != nil {
|
if err := m.Start(context.TODO()); err != nil {
|
||||||
|
@ -3,7 +3,6 @@ package node
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
@ -73,10 +72,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
)
|
)
|
||||||
|
|
||||||
// EnvJournalDisabledEvents is the environment variable through which disabled
|
|
||||||
// journal events can be customized.
|
|
||||||
const EnvJournalDisabledEvents = "LOTUS_JOURNAL_DISABLED_EVENTS"
|
|
||||||
|
|
||||||
//nolint:deadcode,varcheck
|
//nolint:deadcode,varcheck
|
||||||
var log = logging.Logger("builder")
|
var log = logging.Logger("builder")
|
||||||
|
|
||||||
@ -166,19 +161,8 @@ type Settings struct {
|
|||||||
func defaults() []Option {
|
func defaults() []Option {
|
||||||
return []Option{
|
return []Option{
|
||||||
// global system journal.
|
// global system journal.
|
||||||
Override(new(journal.DisabledEvents), func() journal.DisabledEvents {
|
Override(new(journal.DisabledEvents), journal.EnvDisabledEvents),
|
||||||
if env, ok := os.LookupEnv(EnvJournalDisabledEvents); ok {
|
|
||||||
if ret, err := journal.ParseDisabledEvents(env); err == nil {
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// fallback if env variable is not set, or if it failed to parse.
|
|
||||||
return journal.DefaultDisabledEvents
|
|
||||||
}),
|
|
||||||
Override(new(journal.Journal), modules.OpenFilesystemJournal),
|
Override(new(journal.Journal), modules.OpenFilesystemJournal),
|
||||||
Override(InitJournalKey, func(j journal.Journal) {
|
|
||||||
journal.J = j // eagerly sets the global journal through fx.Invoke.
|
|
||||||
}),
|
|
||||||
|
|
||||||
Override(new(helpers.MetricsCtx), context.Background),
|
Override(new(helpers.MetricsCtx), context.Background),
|
||||||
Override(new(record.Validator), modules.RecordValidator),
|
Override(new(record.Validator), modules.RecordValidator),
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
@ -51,9 +52,9 @@ func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt r
|
|||||||
return exch
|
return exch
|
||||||
}
|
}
|
||||||
|
|
||||||
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName) (*messagepool.MessagePool, error) {
|
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
|
||||||
mpp := messagepool.NewProvider(sm, ps)
|
mpp := messagepool.NewProvider(sm, ps)
|
||||||
mp, err := messagepool.New(mpp, ds, nn)
|
mp, err := messagepool.New(mpp, ds, nn, j)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("constructing mpool: %w", err)
|
return nil, xerrors.Errorf("constructing mpool: %w", err)
|
||||||
}
|
}
|
||||||
@ -88,8 +89,8 @@ func ChainBlockService(bs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) dtype
|
|||||||
return blockservice.New(bs, rem)
|
return blockservice.New(bs, rem)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder) *store.ChainStore {
|
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
||||||
chain := store.NewChainStore(bs, ds, syscalls)
|
chain := store.NewChainStore(bs, ds, syscalls, j)
|
||||||
|
|
||||||
if err := chain.Load(); err != nil {
|
if err := chain.Load(); err != nil {
|
||||||
log.Warnf("loading chain state from disk: %s", err)
|
log.Warnf("loading chain state from disk: %s", err)
|
||||||
|
@ -113,7 +113,7 @@ func NewClientDealFunds(ds dtypes.MetadataDS) (ClientDealFunds, error) {
|
|||||||
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/client"))
|
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/client"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, dealFunds ClientDealFunds) (storagemarket.StorageClient, error) {
|
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, dealFunds ClientDealFunds, j journal.Journal) (storagemarket.StorageClient, error) {
|
||||||
net := smnet.NewFromLibp2pHost(h)
|
net := smnet.NewFromLibp2pHost(h)
|
||||||
c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, dealFunds, storageimpl.DealPollingInterval(time.Second))
|
c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, dealFunds, storageimpl.DealPollingInterval(time.Second))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -124,8 +124,8 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md
|
|||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
c.SubscribeToEvents(marketevents.StorageClientLogger)
|
c.SubscribeToEvents(marketevents.StorageClientLogger)
|
||||||
|
|
||||||
evtType := journal.J.RegisterEventType("markets/storage/client", "state_change")
|
evtType := j.RegisterEventType("markets/storage/client", "state_change")
|
||||||
c.SubscribeToEvents(markets.StorageClientJournaler(evtType))
|
c.SubscribeToEvents(markets.StorageClientJournaler(j, evtType))
|
||||||
|
|
||||||
return c.Start(ctx)
|
return c.Start(ctx)
|
||||||
},
|
},
|
||||||
@ -137,7 +137,7 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrievalClient creates a new retrieval client attached to the client blockstore
|
// RetrievalClient creates a new retrieval client attached to the client blockstore
|
||||||
func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI) (retrievalmarket.RetrievalClient, error) {
|
func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
|
||||||
adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI)
|
adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI)
|
||||||
network := rmnet.NewFromLibp2pHost(h)
|
network := rmnet.NewFromLibp2pHost(h)
|
||||||
sc := storedcounter.New(ds, datastore.NewKey("/retr"))
|
sc := storedcounter.New(ds, datastore.NewKey("/retr"))
|
||||||
@ -150,8 +150,8 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore,
|
|||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
|
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
|
||||||
|
|
||||||
evtType := journal.J.RegisterEventType("markets/retrieval/client", "state_change")
|
evtType := j.RegisterEventType("markets/retrieval/client", "state_change")
|
||||||
client.SubscribeToEvents(markets.RetrievalClientJournaler(evtType))
|
client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType))
|
||||||
|
|
||||||
return client.Start(ctx)
|
return client.Start(ctx)
|
||||||
},
|
},
|
||||||
|
@ -161,6 +161,7 @@ type StorageMinerParams struct {
|
|||||||
SectorIDCounter sealing.SectorIDCounter
|
SectorIDCounter sealing.SectorIDCounter
|
||||||
Verifier ffiwrapper.Verifier
|
Verifier ffiwrapper.Verifier
|
||||||
GetSealingConfigFn dtypes.GetSealingConfigFunc
|
GetSealingConfigFn dtypes.GetSealingConfigFunc
|
||||||
|
Journal journal.Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) {
|
func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) {
|
||||||
@ -175,6 +176,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
|
|||||||
sc = params.SectorIDCounter
|
sc = params.SectorIDCounter
|
||||||
verif = params.Verifier
|
verif = params.Verifier
|
||||||
gsd = params.GetSealingConfigFn
|
gsd = params.GetSealingConfigFn
|
||||||
|
j = params.Journal
|
||||||
)
|
)
|
||||||
|
|
||||||
maddr, err := minerAddrFromDS(ds)
|
maddr, err := minerAddrFromDS(ds)
|
||||||
@ -194,12 +196,12 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fps, err := storage.NewWindowedPoStScheduler(api, fc, sealer, sealer, maddr, worker)
|
fps, err := storage.NewWindowedPoStScheduler(api, fc, sealer, sealer, j, maddr, worker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd, fc)
|
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd, fc, j)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -216,15 +218,15 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider) {
|
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider, j journal.Journal) {
|
||||||
m.OnReady(marketevents.ReadyLogger("retrieval provider"))
|
m.OnReady(marketevents.ReadyLogger("retrieval provider"))
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
|
|
||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
m.SubscribeToEvents(marketevents.RetrievalProviderLogger)
|
m.SubscribeToEvents(marketevents.RetrievalProviderLogger)
|
||||||
|
|
||||||
evtType := journal.J.RegisterEventType("markets/retrieval/provider", "state_change")
|
evtType := j.RegisterEventType("markets/retrieval/provider", "state_change")
|
||||||
m.SubscribeToEvents(markets.RetrievalProviderJournaler(evtType))
|
m.SubscribeToEvents(markets.RetrievalProviderJournaler(j, evtType))
|
||||||
|
|
||||||
return m.Start(ctx)
|
return m.Start(ctx)
|
||||||
},
|
},
|
||||||
@ -234,15 +236,15 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h storagemarket.StorageProvider) {
|
func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h storagemarket.StorageProvider, j journal.Journal) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
h.OnReady(marketevents.ReadyLogger("storage provider"))
|
h.OnReady(marketevents.ReadyLogger("storage provider"))
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(context.Context) error {
|
OnStart: func(context.Context) error {
|
||||||
h.SubscribeToEvents(marketevents.StorageProviderLogger)
|
h.SubscribeToEvents(marketevents.StorageProviderLogger)
|
||||||
|
|
||||||
evtType := journal.J.RegisterEventType("markets/storage/provider", "state_change")
|
evtType := j.RegisterEventType("markets/storage/provider", "state_change")
|
||||||
h.SubscribeToEvents(markets.StorageProviderJournaler(evtType))
|
h.SubscribeToEvents(markets.StorageProviderJournaler(j, evtType))
|
||||||
|
|
||||||
return h.Start(ctx)
|
return h.Start(ctx)
|
||||||
},
|
},
|
||||||
@ -354,13 +356,13 @@ func StagingGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.Stagi
|
|||||||
return gs
|
return gs
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter) (*miner.Miner, error) {
|
func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, j journal.Journal) (*miner.Miner, error) {
|
||||||
minerAddr, err := minerAddrFromDS(ds)
|
minerAddr, err := minerAddrFromDS(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
m := miner.NewMiner(api, epp, minerAddr, sf)
|
m := miner.NewMiner(api, epp, minerAddr, sf, j)
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
|
@ -23,17 +23,18 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/vm"
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
"github.com/filecoin-project/lotus/genesis"
|
"github.com/filecoin-project/lotus/genesis"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
"github.com/filecoin-project/lotus/node/modules"
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
var glog = logging.Logger("genesis")
|
var glog = logging.Logger("genesis")
|
||||||
|
|
||||||
func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
|
func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
|
||||||
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
|
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
|
||||||
return func() (*types.BlockHeader, error) {
|
return func() (*types.BlockHeader, error) {
|
||||||
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
|
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
|
||||||
b, err := genesis2.MakeGenesisBlock(context.TODO(), bs, syscalls, template)
|
b, err := genesis2.MakeGenesisBlock(context.TODO(), j, bs, syscalls, template)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("make genesis block failed: %w", err)
|
return nil, xerrors.Errorf("make genesis block failed: %w", err)
|
||||||
}
|
}
|
||||||
@ -50,8 +51,8 @@ func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.Cha
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
|
func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
|
||||||
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
|
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) modules.Genesis {
|
||||||
return func() (*types.BlockHeader, error) {
|
return func() (*types.BlockHeader, error) {
|
||||||
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
|
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
|
||||||
genesisTemplate, err := homedir.Expand(genesisTemplate)
|
genesisTemplate, err := homedir.Expand(genesisTemplate)
|
||||||
@ -73,7 +74,7 @@ func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore
|
|||||||
template.Timestamp = uint64(build.Clock.Now().Unix())
|
template.Timestamp = uint64(build.Clock.Now().Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := genesis2.MakeGenesisBlock(context.TODO(), bs, syscalls, template)
|
b, err := genesis2.MakeGenesisBlock(context.TODO(), j, bs, syscalls, template)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("make genesis block: %w", err)
|
return nil, xerrors.Errorf("make genesis block: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -57,6 +57,8 @@ type Miner struct {
|
|||||||
sealing *sealing.Sealing
|
sealing *sealing.Sealing
|
||||||
|
|
||||||
sealingEvtType journal.EventType
|
sealingEvtType journal.EventType
|
||||||
|
|
||||||
|
journal journal.Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
// SealingStateEvt is a journal event that records a sector state transition.
|
// SealingStateEvt is a journal event that records a sector state transition.
|
||||||
@ -110,7 +112,7 @@ type storageMinerApi interface {
|
|||||||
WalletHas(context.Context, address.Address) (bool, error)
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig) (*Miner, error) {
|
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal) (*Miner, error) {
|
||||||
m := &Miner{
|
m := &Miner{
|
||||||
api: api,
|
api: api,
|
||||||
feeCfg: feeCfg,
|
feeCfg: feeCfg,
|
||||||
@ -123,7 +125,8 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
|
|||||||
maddr: maddr,
|
maddr: maddr,
|
||||||
worker: worker,
|
worker: worker,
|
||||||
getSealConfig: gsd,
|
getSealConfig: gsd,
|
||||||
sealingEvtType: journal.J.RegisterEventType("storage", "sealing_states"),
|
journal: journal,
|
||||||
|
sealingEvtType: journal.RegisterEventType("storage", "sealing_states"),
|
||||||
}
|
}
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
@ -156,7 +159,7 @@ func (m *Miner) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleSealingNotifications(before, after sealing.SectorInfo) {
|
func (m *Miner) handleSealingNotifications(before, after sealing.SectorInfo) {
|
||||||
journal.J.RecordEvent(m.sealingEvtType, func() interface{} {
|
m.journal.RecordEvent(m.sealingEvtType, func() interface{} {
|
||||||
return SealingStateEvt{
|
return SealingStateEvt{
|
||||||
SectorNumber: before.SectorNumber,
|
SectorNumber: before.SectorNumber,
|
||||||
SectorType: before.SectorType,
|
SectorType: before.SectorType,
|
||||||
|
@ -26,11 +26,10 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/journal"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dline.Info) {
|
func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dline.Info) {
|
||||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||||
c := evtCommon{Error: err}
|
c := evtCommon{Error: err}
|
||||||
if ts != nil {
|
if ts != nil {
|
||||||
c.Deadline = deadline
|
c.Deadline = deadline
|
||||||
@ -54,7 +53,7 @@ func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dl
|
|||||||
// recordProofsEvent records a successful proofs_processed event in the
|
// recordProofsEvent records a successful proofs_processed event in the
|
||||||
// journal, even if it was a noop (no partitions).
|
// journal, even if it was a noop (no partitions).
|
||||||
func (s *WindowPoStScheduler) recordProofsEvent(partitions []miner.PoStPartition, mcid cid.Cid) {
|
func (s *WindowPoStScheduler) recordProofsEvent(partitions []miner.PoStPartition, mcid cid.Cid) {
|
||||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} {
|
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} {
|
||||||
return &WdPoStProofsProcessedEvt{
|
return &WdPoStProofsProcessedEvt{
|
||||||
evtCommon: s.getEvtCommon(nil),
|
evtCommon: s.getEvtCommon(nil),
|
||||||
Partitions: partitions,
|
Partitions: partitions,
|
||||||
@ -74,7 +73,7 @@ func (s *WindowPoStScheduler) startGeneratePoST(
|
|||||||
go func() {
|
go func() {
|
||||||
defer abort()
|
defer abort()
|
||||||
|
|
||||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||||
return WdPoStSchedulerEvt{
|
return WdPoStSchedulerEvt{
|
||||||
evtCommon: s.getEvtCommon(nil),
|
evtCommon: s.getEvtCommon(nil),
|
||||||
State: SchedulerStateStarted,
|
State: SchedulerStateStarted,
|
||||||
@ -125,7 +124,7 @@ func (s *WindowPoStScheduler) startSubmitPoST(
|
|||||||
|
|
||||||
err := s.runSubmitPoST(ctx, ts, deadline, posts)
|
err := s.runSubmitPoST(ctx, ts, deadline, posts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||||
return WdPoStSchedulerEvt{
|
return WdPoStSchedulerEvt{
|
||||||
evtCommon: s.getEvtCommon(nil),
|
evtCommon: s.getEvtCommon(nil),
|
||||||
State: SchedulerStateSucceeded,
|
State: SchedulerStateSucceeded,
|
||||||
@ -439,7 +438,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
|
|||||||
log.Errorf("checking sector recoveries: %v", err)
|
log.Errorf("checking sector recoveries: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
|
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
|
||||||
j := WdPoStRecoveriesProcessedEvt{
|
j := WdPoStRecoveriesProcessedEvt{
|
||||||
evtCommon: s.getEvtCommon(err),
|
evtCommon: s.getEvtCommon(err),
|
||||||
Declarations: recoveries,
|
Declarations: recoveries,
|
||||||
@ -458,7 +457,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
|
|||||||
log.Errorf("checking sector faults: %v", err)
|
log.Errorf("checking sector faults: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
|
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
|
||||||
return WdPoStFaultsProcessedEvt{
|
return WdPoStFaultsProcessedEvt{
|
||||||
evtCommon: s.getEvtCommon(err),
|
evtCommon: s.getEvtCommon(err),
|
||||||
Declarations: faults,
|
Declarations: faults,
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockStorageMinerAPI struct {
|
type mockStorageMinerAPI struct {
|
||||||
@ -168,6 +169,7 @@ func TestWDPostDoPost(t *testing.T) {
|
|||||||
proofType: proofType,
|
proofType: proofType,
|
||||||
actor: postAct,
|
actor: postAct,
|
||||||
worker: workerAct,
|
worker: workerAct,
|
||||||
|
journal: journal.NilJournal(),
|
||||||
}
|
}
|
||||||
|
|
||||||
di := &dline.Info{
|
di := &dline.Info{
|
||||||
|
@ -35,12 +35,13 @@ type WindowPoStScheduler struct {
|
|||||||
worker address.Address
|
worker address.Address
|
||||||
|
|
||||||
evtTypes [4]journal.EventType
|
evtTypes [4]journal.EventType
|
||||||
|
journal journal.Journal
|
||||||
|
|
||||||
// failed abi.ChainEpoch // eps
|
// failed abi.ChainEpoch // eps
|
||||||
// failLk sync.Mutex
|
// failLk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb storage.Prover, ft sectorstorage.FaultTracker, actor address.Address, worker address.Address) (*WindowPoStScheduler, error) {
|
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb storage.Prover, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address, worker address.Address) (*WindowPoStScheduler, error) {
|
||||||
mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK)
|
mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting sector size: %w", err)
|
return nil, xerrors.Errorf("getting sector size: %w", err)
|
||||||
@ -62,11 +63,12 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb
|
|||||||
actor: actor,
|
actor: actor,
|
||||||
worker: worker,
|
worker: worker,
|
||||||
evtTypes: [...]journal.EventType{
|
evtTypes: [...]journal.EventType{
|
||||||
evtTypeWdPoStScheduler: journal.J.RegisterEventType("wdpost", "scheduler"),
|
evtTypeWdPoStScheduler: j.RegisterEventType("wdpost", "scheduler"),
|
||||||
evtTypeWdPoStProofs: journal.J.RegisterEventType("wdpost", "proofs_processed"),
|
evtTypeWdPoStProofs: j.RegisterEventType("wdpost", "proofs_processed"),
|
||||||
evtTypeWdPoStRecoveries: journal.J.RegisterEventType("wdpost", "recoveries_processed"),
|
evtTypeWdPoStRecoveries: j.RegisterEventType("wdpost", "recoveries_processed"),
|
||||||
evtTypeWdPoStFaults: journal.J.RegisterEventType("wdpost", "faults_processed"),
|
evtTypeWdPoStFaults: j.RegisterEventType("wdpost", "faults_processed"),
|
||||||
},
|
},
|
||||||
|
journal: j,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,7 +168,7 @@ func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.T
|
|||||||
|
|
||||||
// onAbort is called when generating proofs or submitting proofs is aborted
|
// onAbort is called when generating proofs or submitting proofs is aborted
|
||||||
func (s *WindowPoStScheduler) onAbort(ts *types.TipSet, deadline *dline.Info) {
|
func (s *WindowPoStScheduler) onAbort(ts *types.TipSet, deadline *dline.Info) {
|
||||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||||
c := evtCommon{}
|
c := evtCommon{}
|
||||||
if ts != nil {
|
if ts != nil {
|
||||||
c.Deadline = deadline
|
c.Deadline = deadline
|
||||||
|
Loading…
Reference in New Issue
Block a user