Merge branch 'master' into refactor/net-upgrade
This commit is contained in:
commit
02dcb5e182
@ -27,7 +27,6 @@ type heightEvents struct {
|
||||
}
|
||||
|
||||
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
||||
|
||||
ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
|
||||
defer span.End()
|
||||
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
|
||||
@ -150,7 +149,6 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
||||
//
|
||||
// ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null
|
||||
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error {
|
||||
|
||||
e.lk.Lock() // Tricky locking, check your locks if you modify this function!
|
||||
|
||||
best, err := e.tsc.best()
|
||||
|
@ -3,6 +3,7 @@ package state
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
typegen "github.com/whyrusleeping/cbor-gen"
|
||||
)
|
||||
@ -69,7 +70,7 @@ func DiffAdtArray(preArr, curArr *adt.Array, out AdtArrayDiff) error {
|
||||
// Modify should be called when a value is modified in the map
|
||||
// Remove should be called when a value is removed from the map
|
||||
type AdtMapDiff interface {
|
||||
AsKey(key string) (adt.Keyer, error)
|
||||
AsKey(key string) (abi.Keyer, error)
|
||||
Add(key string, val *typegen.Deferred) error
|
||||
Modify(key string, from, to *typegen.Deferred) error
|
||||
Remove(key string, val *typegen.Deferred) error
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
cbornode "github.com/ipfs/go-ipld-cbor"
|
||||
typegen "github.com/whyrusleeping/cbor-gen"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
|
||||
@ -78,21 +79,21 @@ func TestDiffAdtMap(t *testing.T) {
|
||||
mapA := adt.MakeEmptyMap(ctxstoreA)
|
||||
mapB := adt.MakeEmptyMap(ctxstoreB)
|
||||
|
||||
require.NoError(t, mapA.Put(adt.UIntKey(0), runtime.CBORBytes([]byte{0}))) // delete
|
||||
require.NoError(t, mapA.Put(abi.UIntKey(0), runtime.CBORBytes([]byte{0}))) // delete
|
||||
|
||||
require.NoError(t, mapA.Put(adt.UIntKey(1), runtime.CBORBytes([]byte{0}))) // modify
|
||||
require.NoError(t, mapB.Put(adt.UIntKey(1), runtime.CBORBytes([]byte{1})))
|
||||
require.NoError(t, mapA.Put(abi.UIntKey(1), runtime.CBORBytes([]byte{0}))) // modify
|
||||
require.NoError(t, mapB.Put(abi.UIntKey(1), runtime.CBORBytes([]byte{1})))
|
||||
|
||||
require.NoError(t, mapA.Put(adt.UIntKey(2), runtime.CBORBytes([]byte{1}))) // delete
|
||||
require.NoError(t, mapA.Put(abi.UIntKey(2), runtime.CBORBytes([]byte{1}))) // delete
|
||||
|
||||
require.NoError(t, mapA.Put(adt.UIntKey(3), runtime.CBORBytes([]byte{0}))) // noop
|
||||
require.NoError(t, mapB.Put(adt.UIntKey(3), runtime.CBORBytes([]byte{0})))
|
||||
require.NoError(t, mapA.Put(abi.UIntKey(3), runtime.CBORBytes([]byte{0}))) // noop
|
||||
require.NoError(t, mapB.Put(abi.UIntKey(3), runtime.CBORBytes([]byte{0})))
|
||||
|
||||
require.NoError(t, mapA.Put(adt.UIntKey(4), runtime.CBORBytes([]byte{0}))) // modify
|
||||
require.NoError(t, mapB.Put(adt.UIntKey(4), runtime.CBORBytes([]byte{6})))
|
||||
require.NoError(t, mapA.Put(abi.UIntKey(4), runtime.CBORBytes([]byte{0}))) // modify
|
||||
require.NoError(t, mapB.Put(abi.UIntKey(4), runtime.CBORBytes([]byte{6})))
|
||||
|
||||
require.NoError(t, mapB.Put(adt.UIntKey(5), runtime.CBORBytes{8})) // add
|
||||
require.NoError(t, mapB.Put(adt.UIntKey(6), runtime.CBORBytes{9})) // add
|
||||
require.NoError(t, mapB.Put(abi.UIntKey(5), runtime.CBORBytes{8})) // add
|
||||
require.NoError(t, mapB.Put(abi.UIntKey(6), runtime.CBORBytes{9})) // add
|
||||
|
||||
changes := new(TestDiffMap)
|
||||
|
||||
@ -134,12 +135,12 @@ type TestDiffMap struct {
|
||||
|
||||
var _ AdtMapDiff = &TestDiffMap{}
|
||||
|
||||
func (t *TestDiffMap) AsKey(key string) (adt.Keyer, error) {
|
||||
k, err := adt.ParseUIntKey(key)
|
||||
func (t *TestDiffMap) AsKey(key string) (abi.Keyer, error) {
|
||||
k, err := abi.ParseUIntKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return adt.UIntKey(k), nil
|
||||
return abi.UIntKey(k), nil
|
||||
}
|
||||
|
||||
func (t *TestDiffMap) Add(key string, val *typegen.Deferred) error {
|
||||
@ -148,7 +149,7 @@ func (t *TestDiffMap) Add(key string, val *typegen.Deferred) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
k, err := adt.ParseUIntKey(key)
|
||||
k, err := abi.ParseUIntKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -172,7 +173,7 @@ func (t *TestDiffMap) Modify(key string, from, to *typegen.Deferred) error {
|
||||
return err
|
||||
}
|
||||
|
||||
k, err := adt.ParseUIntKey(key)
|
||||
k, err := abi.ParseUIntKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -198,7 +199,7 @@ func (t *TestDiffMap) Remove(key string, val *typegen.Deferred) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
k, err := adt.ParseUIntKey(key)
|
||||
k, err := abi.ParseUIntKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -537,8 +537,8 @@ type MinerPreCommitChanges struct {
|
||||
Removed []miner.SectorPreCommitOnChainInfo
|
||||
}
|
||||
|
||||
func (m *MinerPreCommitChanges) AsKey(key string) (adt.Keyer, error) {
|
||||
sector, err := adt.ParseUIntKey(key)
|
||||
func (m *MinerPreCommitChanges) AsKey(key string) (abi.Keyer, error) {
|
||||
sector, err := abi.ParseUIntKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -662,12 +662,12 @@ type AddressChange struct {
|
||||
|
||||
type DiffInitActorStateFunc func(ctx context.Context, oldState *init_.State, newState *init_.State) (changed bool, user UserData, err error)
|
||||
|
||||
func (i *InitActorAddressChanges) AsKey(key string) (adt.Keyer, error) {
|
||||
func (i *InitActorAddressChanges) AsKey(key string) (abi.Keyer, error) {
|
||||
addr, err := address.NewFromBytes([]byte(key))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return adt.AddrKey(addr), nil
|
||||
return abi.AddrKey(addr), nil
|
||||
}
|
||||
|
||||
func (i *InitActorAddressChanges) Add(key string, val *typegen.Deferred) error {
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
@ -50,7 +51,7 @@ func SetupInitActor(bs bstore.Blockstore, netname string, initialActors []genesi
|
||||
fmt.Printf("init set %s t0%d\n", e, counter)
|
||||
|
||||
value := cbg.CborInt(counter)
|
||||
if err := amap.Put(adt.AddrKey(e), &value); err != nil {
|
||||
if err := amap.Put(abi.AddrKey(e), &value); err != nil {
|
||||
return 0, nil, nil, err
|
||||
}
|
||||
counter = counter + 1
|
||||
@ -77,7 +78,7 @@ func SetupInitActor(bs bstore.Blockstore, netname string, initialActors []genesi
|
||||
fmt.Printf("init set %s t0%d\n", ainfo.Owner, counter)
|
||||
|
||||
value := cbg.CborInt(counter)
|
||||
if err := amap.Put(adt.AddrKey(ainfo.Owner), &value); err != nil {
|
||||
if err := amap.Put(abi.AddrKey(ainfo.Owner), &value); err != nil {
|
||||
return 0, nil, nil, err
|
||||
}
|
||||
counter = counter + 1
|
||||
@ -95,7 +96,7 @@ func SetupInitActor(bs bstore.Blockstore, netname string, initialActors []genesi
|
||||
return 0, nil, nil, xerrors.Errorf("unmarshaling account meta: %w", err)
|
||||
}
|
||||
value := cbg.CborInt(80)
|
||||
if err := amap.Put(adt.AddrKey(ainfo.Owner), &value); err != nil {
|
||||
if err := amap.Put(abi.AddrKey(ainfo.Owner), &value); err != nil {
|
||||
return 0, nil, nil, err
|
||||
}
|
||||
} else if rootVerifier.Type == genesis.TMultisig {
|
||||
@ -110,7 +111,7 @@ func SetupInitActor(bs bstore.Blockstore, netname string, initialActors []genesi
|
||||
fmt.Printf("init set %s t0%d\n", e, counter)
|
||||
|
||||
value := cbg.CborInt(counter)
|
||||
if err := amap.Put(adt.AddrKey(e), &value); err != nil {
|
||||
if err := amap.Put(abi.AddrKey(e), &value); err != nil {
|
||||
return 0, nil, nil, err
|
||||
}
|
||||
counter = counter + 1
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/vm"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/lib/sigs"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
|
||||
@ -83,6 +84,26 @@ const (
|
||||
localUpdates = "update"
|
||||
)
|
||||
|
||||
// Journal event types.
|
||||
const (
|
||||
evtTypeMpoolAdd = iota
|
||||
evtTypeMpoolRemove
|
||||
evtTypeMpoolRepub
|
||||
)
|
||||
|
||||
// MessagePoolEvt is the journal entry for message pool events.
|
||||
type MessagePoolEvt struct {
|
||||
Action string
|
||||
Messages []MessagePoolEvtMessage
|
||||
Error error `json:",omitempty"`
|
||||
}
|
||||
|
||||
type MessagePoolEvtMessage struct {
|
||||
types.Message
|
||||
|
||||
CID cid.Cid
|
||||
}
|
||||
|
||||
// this is *temporary* mutilation until we have implemented uncapped miner penalties -- it will go
|
||||
// away in the next fork.
|
||||
var strictBaseFeeValidation = false
|
||||
@ -140,6 +161,8 @@ type MessagePool struct {
|
||||
netName dtypes.NetworkName
|
||||
|
||||
sigValCache *lru.TwoQueueCache
|
||||
|
||||
evtTypes [3]journal.EventType
|
||||
}
|
||||
|
||||
type msgSet struct {
|
||||
@ -316,6 +339,11 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
|
||||
api: api,
|
||||
netName: netName,
|
||||
cfg: cfg,
|
||||
evtTypes: [...]journal.EventType{
|
||||
evtTypeMpoolAdd: journal.J.RegisterEventType("mpool", "add"),
|
||||
evtTypeMpoolRemove: journal.J.RegisterEventType("mpool", "remove"),
|
||||
evtTypeMpoolRepub: journal.J.RegisterEventType("mpool", "repub"),
|
||||
},
|
||||
}
|
||||
|
||||
// enable initial prunes
|
||||
@ -367,10 +395,12 @@ func (mp *MessagePool) runLoop() {
|
||||
if err := mp.republishPendingMessages(); err != nil {
|
||||
log.Errorf("error while republishing messages: %s", err)
|
||||
}
|
||||
|
||||
case <-mp.pruneTrigger:
|
||||
if err := mp.pruneExcessMessages(); err != nil {
|
||||
log.Errorf("failed to prune excess messages from mempool: %s", err)
|
||||
}
|
||||
|
||||
case <-mp.closer:
|
||||
mp.repubTk.Stop()
|
||||
return
|
||||
@ -700,6 +730,14 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
|
||||
Type: api.MpoolAdd,
|
||||
Message: m,
|
||||
}, localUpdates)
|
||||
|
||||
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
|
||||
return MessagePoolEvt{
|
||||
Action: "add",
|
||||
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}},
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -862,6 +900,12 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool)
|
||||
Message: m,
|
||||
}, localUpdates)
|
||||
|
||||
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
|
||||
return MessagePoolEvt{
|
||||
Action: "remove",
|
||||
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}}}
|
||||
})
|
||||
|
||||
mp.currentSize--
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
@ -146,6 +147,19 @@ loop:
|
||||
}
|
||||
}
|
||||
|
||||
if len(msgs) > 0 {
|
||||
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
|
||||
msgs := make([]MessagePoolEvtMessage, 0, len(msgs))
|
||||
for _, m := range msgs {
|
||||
msgs = append(msgs, MessagePoolEvtMessage{Message: m.Message, CID: m.Cid()})
|
||||
}
|
||||
return MessagePoolEvt{
|
||||
Action: "repub",
|
||||
Messages: msgs,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// track most recently republished messages
|
||||
republished := make(map[cid.Cid]struct{})
|
||||
for _, m := range msgs[:count] {
|
||||
|
@ -74,8 +74,8 @@ func (ta *testActor) Exports() []interface{} {
|
||||
|
||||
func (ta *testActor) Constructor(rt runtime.Runtime, params *abi.EmptyValue) *abi.EmptyValue {
|
||||
rt.ValidateImmediateCallerAcceptAny()
|
||||
rt.State().Create(&testActorState{11})
|
||||
fmt.Println("NEW ACTOR ADDRESS IS: ", rt.Message().Receiver())
|
||||
rt.StateCreate(&testActorState{11})
|
||||
fmt.Println("NEW ACTOR ADDRESS IS: ", rt.Receiver())
|
||||
|
||||
return abi.Empty
|
||||
}
|
||||
@ -83,7 +83,7 @@ func (ta *testActor) Constructor(rt runtime.Runtime, params *abi.EmptyValue) *ab
|
||||
func (ta *testActor) TestMethod(rt runtime.Runtime, params *abi.EmptyValue) *abi.EmptyValue {
|
||||
rt.ValidateImmediateCallerAcceptAny()
|
||||
var st testActorState
|
||||
rt.State().Readonly(&st)
|
||||
rt.StateReadonly(&st)
|
||||
|
||||
if rt.CurrEpoch() > testForkHeight {
|
||||
if st.HasUpgraded != 55 {
|
||||
|
@ -97,7 +97,7 @@ func GetPowerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr addres
|
||||
}
|
||||
|
||||
var claim power.Claim
|
||||
if _, err := cm.Get(adt.AddrKey(maddr), &claim); err != nil {
|
||||
if _, err := cm.Get(abi.AddrKey(maddr), &claim); err != nil {
|
||||
return power.Claim{}, power.Claim{}, err
|
||||
}
|
||||
|
||||
@ -289,7 +289,7 @@ func GetMinerSlashed(ctx context.Context, sm *StateManager, ts *types.TipSet, ma
|
||||
return false, err
|
||||
}
|
||||
|
||||
ok, err := claims.Get(power.AddrKey(maddr), nil)
|
||||
ok, err := claims.Get(abi.AddrKey(maddr), nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -74,6 +74,20 @@ func init() {
|
||||
// ReorgNotifee represents a callback that gets called upon reorgs.
|
||||
type ReorgNotifee func(rev, app []*types.TipSet) error
|
||||
|
||||
// Journal event types.
|
||||
const (
|
||||
evtTypeHeadChange = iota
|
||||
)
|
||||
|
||||
type HeadChangeEvt struct {
|
||||
From types.TipSetKey
|
||||
FromHeight abi.ChainEpoch
|
||||
To types.TipSetKey
|
||||
ToHeight abi.ChainEpoch
|
||||
RevertCount int
|
||||
ApplyCount int
|
||||
}
|
||||
|
||||
// ChainStore is the main point of access to chain data.
|
||||
//
|
||||
// Raw chain data is stored in the Blockstore, with relevant markers (genesis,
|
||||
@ -105,6 +119,8 @@ type ChainStore struct {
|
||||
tsCache *lru.ARCCache
|
||||
|
||||
vmcalls vm.SyscallBuilder
|
||||
|
||||
evtTypes [1]journal.EventType
|
||||
}
|
||||
|
||||
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore {
|
||||
@ -120,6 +136,10 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB
|
||||
vmcalls: vmcalls,
|
||||
}
|
||||
|
||||
cs.evtTypes = [1]journal.EventType{
|
||||
evtTypeHeadChange: journal.J.RegisterEventType("sync", "head_change"),
|
||||
}
|
||||
|
||||
ci := NewChainIndex(cs.LoadTipSet)
|
||||
|
||||
cs.cindex = ci
|
||||
@ -346,12 +366,15 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
|
||||
continue
|
||||
}
|
||||
|
||||
journal.Add("sync", map[string]interface{}{
|
||||
"op": "headChange",
|
||||
"from": r.old.Key(),
|
||||
"to": r.new.Key(),
|
||||
"rev": len(revert),
|
||||
"apply": len(apply),
|
||||
journal.J.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} {
|
||||
return HeadChangeEvt{
|
||||
From: r.old.Key(),
|
||||
FromHeight: r.old.Height(),
|
||||
To: r.new.Key(),
|
||||
ToHeight: r.new.Height(),
|
||||
RevertCount: len(revert),
|
||||
ApplyCount: len(apply),
|
||||
}
|
||||
})
|
||||
|
||||
// reverse the apply array
|
||||
|
@ -648,7 +648,7 @@ func (syncer *Syncer) minerIsValid(ctx context.Context, maddr address.Address, b
|
||||
}
|
||||
|
||||
var claim power.Claim
|
||||
exist, err := cm.Get(adt.AddrKey(maddr), &claim)
|
||||
exist, err := cm.Get(abi.AddrKey(maddr), &claim)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -8,23 +8,20 @@ import (
|
||||
gruntime "runtime"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
statecbor "github.com/filecoin-project/go-state-types/cbor"
|
||||
"github.com/filecoin-project/go-state-types/cbor"
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
"github.com/filecoin-project/go-state-types/exitcode"
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
rtt "github.com/filecoin-project/go-state-types/rt"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
vmr "github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
rt0 "github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
"github.com/ipfs/go-cid"
|
||||
ipldcbor "github.com/ipfs/go-ipld-cbor"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors/aerrors"
|
||||
@ -33,20 +30,20 @@ import (
|
||||
)
|
||||
|
||||
type Runtime struct {
|
||||
types.Message
|
||||
rt0.Syscalls
|
||||
|
||||
ctx context.Context
|
||||
|
||||
vm *VM
|
||||
state *state.StateTree
|
||||
height abi.ChainEpoch
|
||||
cst cbor.IpldStore
|
||||
cst ipldcbor.IpldStore
|
||||
pricelist Pricelist
|
||||
vmr.Message
|
||||
|
||||
gasAvailable int64
|
||||
gasUsed int64
|
||||
|
||||
runtime.Syscalls
|
||||
|
||||
// address that started invoke chain
|
||||
origin address.Address
|
||||
originNonce uint64
|
||||
@ -87,11 +84,11 @@ type notFoundErr interface {
|
||||
IsNotFound() bool
|
||||
}
|
||||
|
||||
func (rt *Runtime) StoreGet(c cid.Cid, o statecbor.Unmarshaler) bool {
|
||||
func (rt *Runtime) StoreGet(c cid.Cid, o cbor.Unmarshaler) bool {
|
||||
if err := rt.cst.Get(context.TODO(), c, o); err != nil {
|
||||
var nfe notFoundErr
|
||||
if xerrors.As(err, &nfe) && nfe.IsNotFound() {
|
||||
if xerrors.As(err, new(cbor.SerializationError)) {
|
||||
if xerrors.As(err, new(ipldcbor.SerializationError)) {
|
||||
panic(aerrors.Newf(exitcode.ErrSerialization, "failed to unmarshal cbor object %s", err))
|
||||
}
|
||||
return false
|
||||
@ -102,10 +99,10 @@ func (rt *Runtime) StoreGet(c cid.Cid, o statecbor.Unmarshaler) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (rt *Runtime) StorePut(x statecbor.Marshaler) cid.Cid {
|
||||
func (rt *Runtime) StorePut(x cbor.Marshaler) cid.Cid {
|
||||
c, err := rt.cst.Put(context.TODO(), x)
|
||||
if err != nil {
|
||||
if xerrors.As(err, new(cbor.SerializationError)) {
|
||||
if xerrors.As(err, new(ipldcbor.SerializationError)) {
|
||||
panic(aerrors.Newf(exitcode.ErrSerialization, "failed to marshal cbor object %s", err))
|
||||
}
|
||||
panic(aerrors.Fatalf("failed to put cbor object: %s", err))
|
||||
@ -113,7 +110,7 @@ func (rt *Runtime) StorePut(x statecbor.Marshaler) cid.Cid {
|
||||
return c
|
||||
}
|
||||
|
||||
var _ vmr.Runtime = (*Runtime)(nil)
|
||||
var _ rt0.Runtime = (*Runtime)(nil)
|
||||
|
||||
func (rt *Runtime) shimCall(f func() interface{}) (rval []byte, aerr aerrors.ActorError) {
|
||||
defer func() {
|
||||
@ -141,7 +138,7 @@ func (rt *Runtime) shimCall(f func() interface{}) (rval []byte, aerr aerrors.Act
|
||||
return ret, nil
|
||||
case *abi.EmptyValue:
|
||||
return nil, nil
|
||||
case cbg.CBORMarshaler:
|
||||
case cbor.Marshaler:
|
||||
buf := new(bytes.Buffer)
|
||||
if err := ret.MarshalCBOR(buf); err != nil {
|
||||
return nil, aerrors.Absorb(err, 2, "failed to marshal response to cbor")
|
||||
@ -196,10 +193,6 @@ func (rt *Runtime) GetRandomnessFromBeacon(personalization crypto.DomainSeparati
|
||||
return res
|
||||
}
|
||||
|
||||
func (rt *Runtime) Store() vmr.Store {
|
||||
return rt
|
||||
}
|
||||
|
||||
func (rt *Runtime) NewActorAddress() address.Address {
|
||||
var b bytes.Buffer
|
||||
oa, _ := ResolveToKeyAddr(rt.vm.cstate, rt.vm.cst, rt.origin)
|
||||
@ -324,7 +317,7 @@ func (rt *Runtime) CurrEpoch() abi.ChainEpoch {
|
||||
return rt.height
|
||||
}
|
||||
|
||||
func (rt *Runtime) Send(to address.Address, method abi.MethodNum, m statecbor.Marshaler, value abi.TokenAmount, out statecbor.Er) exitcode.ExitCode {
|
||||
func (rt *Runtime) Send(to address.Address, method abi.MethodNum, m cbor.Marshaler, value abi.TokenAmount, out cbor.Er) exitcode.ExitCode {
|
||||
if !rt.allowInternal {
|
||||
rt.Abortf(exitcode.SysErrorIllegalActor, "runtime.Send() is currently disallowed")
|
||||
}
|
||||
@ -348,10 +341,8 @@ func (rt *Runtime) Send(to address.Address, method abi.MethodNum, m statecbor.Ma
|
||||
_ = rt.chargeGasSafe(gasOnActorExec)
|
||||
|
||||
if err := out.UnmarshalCBOR(bytes.NewReader(ret)); err != nil {
|
||||
// REVIEW: always fatal?
|
||||
panic(err)
|
||||
rt.Abortf(exitcode.ErrSerialization, "failed to unmarshal return value: %s", err)
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
@ -396,7 +387,7 @@ func (rt *Runtime) internalSend(from, to address.Address, method abi.MethodNum,
|
||||
return ret, errSend
|
||||
}
|
||||
|
||||
func (rt *Runtime) StateCreate(obj statecbor.Marshaler) {
|
||||
func (rt *Runtime) StateCreate(obj cbor.Marshaler) {
|
||||
c := rt.StorePut(obj)
|
||||
err := rt.stateCommit(EmptyObjectCid, c)
|
||||
if err != nil {
|
||||
@ -404,7 +395,7 @@ func (rt *Runtime) StateCreate(obj statecbor.Marshaler) {
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *Runtime) StateReadonly(obj statecbor.Unmarshaler) {
|
||||
func (rt *Runtime) StateReadonly(obj cbor.Unmarshaler) {
|
||||
act, err := rt.state.GetActor(rt.Receiver())
|
||||
if err != nil {
|
||||
rt.Abortf(exitcode.SysErrorIllegalArgument, "failed to get actor for Readonly state: %s", err)
|
||||
@ -412,7 +403,7 @@ func (rt *Runtime) StateReadonly(obj statecbor.Unmarshaler) {
|
||||
rt.StoreGet(act.Head, obj)
|
||||
}
|
||||
|
||||
func (rt *Runtime) StateTransaction(obj statecbor.Er, f func()) {
|
||||
func (rt *Runtime) StateTransaction(obj cbor.Er, f func()) {
|
||||
if obj == nil {
|
||||
rt.Abortf(exitcode.SysErrorIllegalActor, "Must not pass nil to Transaction()")
|
||||
}
|
||||
|
@ -42,6 +42,6 @@ func TestRuntimePutErrors(t *testing.T) {
|
||||
cst: cbor.NewCborStore(nil),
|
||||
}
|
||||
|
||||
rt.Put(&NotAVeryGoodMarshaler{})
|
||||
rt.StorePut(&NotAVeryGoodMarshaler{})
|
||||
t.Error("expected panic")
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ func (vm *VM) makeRuntime(ctx context.Context, msg *types.Message, origin addres
|
||||
rt.Abortf(exitcode.SysErrInvalidReceiver, "resolve msg.From address failed")
|
||||
}
|
||||
vmm.From = resF
|
||||
rt.Message = &vmm
|
||||
rt.Message = vmm
|
||||
|
||||
return rt
|
||||
}
|
||||
|
@ -659,7 +659,7 @@ func handleHamtEpoch(ctx context.Context, api api.FullNode, r cid.Cid) error {
|
||||
}
|
||||
|
||||
return mp.ForEach(nil, func(key string) error {
|
||||
ik, err := adt.ParseIntKey(key)
|
||||
ik, err := abi.ParseIntKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -221,7 +221,7 @@ func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSe
|
||||
var claim power.Claim
|
||||
// get miner claim from power actors claim map and store if found, else the miner had no claim at
|
||||
// this tipset
|
||||
found, err := minersClaims.Get(adt.AddrKey(act.addr), &claim)
|
||||
found, err := minersClaims.Get(abi.AddrKey(act.addr), &claim)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -136,7 +136,8 @@ create unique index if not exists block_cid_uindex
|
||||
on blocks (cid,height);
|
||||
|
||||
create materialized view if not exists state_heights
|
||||
as select distinct height, parentstateroot from blocks;
|
||||
as select min(b.height) height, b.parentstateroot
|
||||
from blocks b group by b.parentstateroot;
|
||||
|
||||
create index if not exists state_heights_height_index
|
||||
on state_heights (height);
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
@ -336,7 +337,7 @@ var verifRegCheckVerifierCmd = &cli.Command{
|
||||
}
|
||||
|
||||
var dcap verifreg.DataCap
|
||||
if found, err := vh.Get(adt.AddrKey(vaddr), &dcap); err != nil {
|
||||
if found, err := vh.Get(abi.AddrKey(vaddr), &dcap); err != nil {
|
||||
return err
|
||||
} else if !found {
|
||||
return fmt.Errorf("not found")
|
||||
|
@ -44,6 +44,7 @@ import (
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/genesis"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
@ -459,6 +460,12 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
|
||||
return err
|
||||
}
|
||||
|
||||
if jrnl, err := journal.OpenFSJournal(lr, journal.DefaultDisabledEvents); err == nil {
|
||||
journal.J = jrnl
|
||||
} else {
|
||||
return fmt.Errorf("failed to open filesystem journal: %w", err)
|
||||
}
|
||||
|
||||
m := miner.NewMiner(api, epp, a, slashfilter.New(mds))
|
||||
{
|
||||
if err := m.Start(ctx); err != nil {
|
||||
|
@ -100,18 +100,14 @@ type SendReturn struct {
|
||||
// passed parameters.
|
||||
func (a Actor) Send(rt runtime.Runtime, args *SendArgs) *SendReturn {
|
||||
rt.ValidateImmediateCallerAcceptAny()
|
||||
ret, code := rt.Send(
|
||||
var out runtime.CBORBytes
|
||||
code := rt.Send(
|
||||
args.To,
|
||||
args.Method,
|
||||
runtime.CBORBytes(args.Params),
|
||||
args.Value,
|
||||
&out,
|
||||
)
|
||||
var out runtime.CBORBytes
|
||||
if ret != nil {
|
||||
if err := ret.Into(&out); err != nil {
|
||||
rt.Abortf(exitcode.ErrIllegalState, "failed to unmarshal send return: %v", err)
|
||||
}
|
||||
}
|
||||
return &SendReturn{
|
||||
Return: out,
|
||||
Code: code,
|
||||
@ -217,14 +213,14 @@ func (a Actor) MutateState(rt runtime.Runtime, args *MutateStateArgs) *abi.Empty
|
||||
var st State
|
||||
switch args.Branch {
|
||||
case MutateInTransaction:
|
||||
rt.State().Transaction(&st, func() {
|
||||
rt.StateTransaction(&st, func() {
|
||||
st.Value = args.Value
|
||||
})
|
||||
case MutateReadonly:
|
||||
rt.State().Readonly(&st)
|
||||
rt.StateReadonly(&st)
|
||||
st.Value = args.Value
|
||||
case MutateAfterTransaction:
|
||||
rt.State().Transaction(&st, func() {
|
||||
rt.StateTransaction(&st, func() {
|
||||
st.Value = args.Value + "-in"
|
||||
})
|
||||
st.Value = args.Value
|
||||
|
@ -46,7 +46,7 @@ func TestMutateStateInTransaction(t *testing.T) {
|
||||
var a Actor
|
||||
|
||||
rt.ExpectValidateCallerAny()
|
||||
rt.Create(&State{})
|
||||
rt.StateCreate(&State{})
|
||||
|
||||
val := "__mutstat test"
|
||||
rt.Call(a.MutateState, &MutateStateArgs{
|
||||
@ -72,7 +72,7 @@ func TestMutateStateAfterTransaction(t *testing.T) {
|
||||
var a Actor
|
||||
|
||||
rt.ExpectValidateCallerAny()
|
||||
rt.Create(&State{})
|
||||
rt.StateCreate(&State{})
|
||||
|
||||
val := "__mutstat test"
|
||||
rt.Call(a.MutateState, &MutateStateArgs{
|
||||
@ -99,7 +99,7 @@ func TestMutateStateReadonly(t *testing.T) {
|
||||
var a Actor
|
||||
|
||||
rt.ExpectValidateCallerAny()
|
||||
rt.Create(&State{})
|
||||
rt.StateCreate(&State{})
|
||||
|
||||
val := "__mutstat test"
|
||||
rt.Call(a.MutateState, &MutateStateArgs{
|
||||
|
6
extern/storage-sealing/fsm.go
vendored
6
extern/storage-sealing/fsm.go
vendored
@ -189,6 +189,12 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
state.Log = append(state.Log, l)
|
||||
}
|
||||
|
||||
if m.notifee != nil {
|
||||
defer func(before SectorInfo) {
|
||||
m.notifee(before, *state)
|
||||
}(*state) // take safe-ish copy of the before state (except for nested pointers)
|
||||
}
|
||||
|
||||
p := fsmPlanners[state.State]
|
||||
if p == nil {
|
||||
return nil, 0, xerrors.Errorf("planner for state %s not found", state.State)
|
||||
|
14
extern/storage-sealing/fsm_test.go
vendored
14
extern/storage-sealing/fsm_test.go
vendored
@ -27,6 +27,7 @@ type test struct {
|
||||
}
|
||||
|
||||
func TestHappyPath(t *testing.T) {
|
||||
var notif []struct{ before, after SectorInfo }
|
||||
ma, _ := address.NewIDAddress(55151)
|
||||
m := test{
|
||||
s: &Sealing{
|
||||
@ -34,6 +35,9 @@ func TestHappyPath(t *testing.T) {
|
||||
stats: SectorStats{
|
||||
bySector: map[abi.SectorID]statSectorState{},
|
||||
},
|
||||
notifee: func(before, after SectorInfo) {
|
||||
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
||||
},
|
||||
},
|
||||
t: t,
|
||||
state: &SectorInfo{State: Packing},
|
||||
@ -68,6 +72,16 @@ func TestHappyPath(t *testing.T) {
|
||||
|
||||
m.planSingle(SectorFinalized{})
|
||||
require.Equal(m.t, m.state.State, Proving)
|
||||
|
||||
expected := []SectorState{Packing, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector, Proving}
|
||||
for i, n := range notif {
|
||||
if n.before.State != expected[i] {
|
||||
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
|
||||
}
|
||||
if n.after.State != expected[i+1] {
|
||||
t.Fatalf("expected after state: %s, got: %s", expected[i+1], n.after.State)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeedRevert(t *testing.T) {
|
||||
|
9
extern/storage-sealing/sealing.go
vendored
9
extern/storage-sealing/sealing.go
vendored
@ -61,6 +61,8 @@ type SealingAPI interface {
|
||||
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
|
||||
}
|
||||
|
||||
type SectorStateNotifee func(before, after SectorInfo)
|
||||
|
||||
type Sealing struct {
|
||||
api SealingAPI
|
||||
feeCfg FeeConfig
|
||||
@ -79,6 +81,8 @@ type Sealing struct {
|
||||
upgradeLk sync.Mutex
|
||||
toUpgrade map[abi.SectorNumber]struct{}
|
||||
|
||||
notifee SectorStateNotifee
|
||||
|
||||
stats SectorStats
|
||||
|
||||
getConfig GetSealingConfigFunc
|
||||
@ -101,7 +105,7 @@ type UnsealedSectorInfo struct {
|
||||
pieceSizes []abi.UnpaddedPieceSize
|
||||
}
|
||||
|
||||
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc) *Sealing {
|
||||
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee) *Sealing {
|
||||
s := &Sealing{
|
||||
api: api,
|
||||
feeCfg: fc,
|
||||
@ -118,6 +122,9 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
|
||||
},
|
||||
|
||||
toUpgrade: map[abi.SectorNumber]struct{}{},
|
||||
|
||||
notifee: notifee,
|
||||
|
||||
getConfig: gc,
|
||||
|
||||
stats: SectorStats{
|
||||
|
2
go.mod
2
go.mod
@ -36,7 +36,7 @@ require (
|
||||
github.com/filecoin-project/go-statemachine v0.0.0-20200813232949-df9b130df370
|
||||
github.com/filecoin-project/go-statestore v0.1.0
|
||||
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
|
||||
github.com/filecoin-project/specs-actors v0.9.9-0.20200911231631-727cd8845d30
|
||||
github.com/filecoin-project/specs-actors v0.9.10
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20200907031224-ed2e5cd13796
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.1
|
||||
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
|
||||
|
7
go.sum
7
go.sum
@ -91,7 +91,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
|
||||
github.com/briandowns/spinner v1.11.1 h1:OixPqDEcX3juo5AjQZAnFPbeUA0jvkp2qzB5gOZJ/L0=
|
||||
github.com/briandowns/spinner v1.11.1/go.mod h1:QOuQk7x+EaDASo80FEXwlwiA+j/PPIcX3FScO+3/ZPQ=
|
||||
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
|
||||
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
|
||||
@ -243,7 +242,6 @@ github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 h
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
|
||||
github.com/filecoin-project/go-state-types v0.0.0-20200903145444-247639ffa6ad/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
|
||||
github.com/filecoin-project/go-state-types v0.0.0-20200904021452-1883f36ca2f4/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
|
||||
github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df h1:m2esXSuGBkuXlRyCsl1a/7/FkFam63o1OzIgzaHtOfI=
|
||||
github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
|
||||
github.com/filecoin-project/go-state-types v0.0.0-20200911004822-964d6c679cfc h1:1vr/LoqGq5m5g37Q3sNSAjfwF1uJY0zmiHcvnxY6hik=
|
||||
github.com/filecoin-project/go-state-types v0.0.0-20200911004822-964d6c679cfc/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
|
||||
@ -255,10 +253,9 @@ github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZO
|
||||
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
|
||||
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
|
||||
github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4=
|
||||
github.com/filecoin-project/specs-actors v0.9.7 h1:7PAZ8kdqwBdmgf/23FCkQZLCXcVu02XJrkpkhBikiA8=
|
||||
github.com/filecoin-project/specs-actors v0.9.7/go.mod h1:wM2z+kwqYgXn5Z7scV1YHLyd1Q1cy0R8HfTIWQ0BFGU=
|
||||
github.com/filecoin-project/specs-actors v0.9.9-0.20200911231631-727cd8845d30 h1:6Kn6y3TpJbk5BsvhVha+3jr7C3gAAJq0rCnwUYOWRl0=
|
||||
github.com/filecoin-project/specs-actors v0.9.9-0.20200911231631-727cd8845d30/go.mod h1:czlvLQGEX0fjLLfdNHD7xLymy6L3n7aQzRWzsYGf+ys=
|
||||
github.com/filecoin-project/specs-actors v0.9.10 h1:gU0TrRhgkCsBEOP42sGDE7RQuR0Cov9hJhBqq+RJmjU=
|
||||
github.com/filecoin-project/specs-actors v0.9.10/go.mod h1:czlvLQGEX0fjLLfdNHD7xLymy6L3n7aQzRWzsYGf+ys=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20200907031224-ed2e5cd13796 h1:dJsTPWpG2pcTeojO2pyn0c6l+x/3MZYCBgo/9d11JEk=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20200907031224-ed2e5cd13796/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.1 h1:5fNF76nl4qolEvcIsjc0kUADlTMVHO73tW4kXXPnsus=
|
||||
|
136
journal/fs.go
Normal file
136
journal/fs.go
Normal file
@ -0,0 +1,136 @@
|
||||
package journal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
const RFC3339nocolon = "2006-01-02T150405Z0700"
|
||||
|
||||
// fsJournal is a basic journal backed by files on a filesystem.
|
||||
type fsJournal struct {
|
||||
EventTypeRegistry
|
||||
|
||||
dir string
|
||||
sizeLimit int64
|
||||
|
||||
fi *os.File
|
||||
fSize int64
|
||||
|
||||
incoming chan *Event
|
||||
|
||||
closing chan struct{}
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
// OpenFSJournal constructs a rolling filesystem journal, with a default
|
||||
// per-file size limit of 1GiB.
|
||||
func OpenFSJournal(lr repo.LockedRepo, disabled DisabledEvents) (Journal, error) {
|
||||
dir := filepath.Join(lr.Path(), "journal")
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err)
|
||||
}
|
||||
|
||||
f := &fsJournal{
|
||||
EventTypeRegistry: NewEventTypeRegistry(disabled),
|
||||
dir: dir,
|
||||
sizeLimit: 1 << 30,
|
||||
incoming: make(chan *Event, 32),
|
||||
closing: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
if err := f.rollJournalFile(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go f.runLoop()
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (f *fsJournal) RecordEvent(evtType EventType, supplier func() interface{}) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Warnf("recovered from panic while recording journal event; type=%s, err=%v", evtType, r)
|
||||
}
|
||||
}()
|
||||
|
||||
if !evtType.Enabled() {
|
||||
return
|
||||
}
|
||||
|
||||
je := &Event{
|
||||
EventType: evtType,
|
||||
Timestamp: build.Clock.Now(),
|
||||
Data: supplier(),
|
||||
}
|
||||
select {
|
||||
case f.incoming <- je:
|
||||
case <-f.closing:
|
||||
log.Warnw("journal closed but tried to log event", "event", je)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fsJournal) Close() error {
|
||||
close(f.closing)
|
||||
<-f.closed
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fsJournal) putEvent(evt *Event) error {
|
||||
b, err := json.Marshal(evt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := f.fi.Write(append(b, '\n'))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f.fSize += int64(n)
|
||||
|
||||
if f.fSize >= f.sizeLimit {
|
||||
_ = f.rollJournalFile()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fsJournal) rollJournalFile() error {
|
||||
if f.fi != nil {
|
||||
_ = f.fi.Close()
|
||||
}
|
||||
|
||||
nfi, err := os.Create(filepath.Join(f.dir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(RFC3339nocolon))))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to open journal file: %w", err)
|
||||
}
|
||||
|
||||
f.fi = nfi
|
||||
f.fSize = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fsJournal) runLoop() {
|
||||
defer close(f.closed)
|
||||
|
||||
for {
|
||||
select {
|
||||
case je := <-f.incoming:
|
||||
if err := f.putEvent(je); err != nil {
|
||||
log.Errorw("failed to write out journal event", "event", je, "err", err)
|
||||
}
|
||||
case <-f.closing:
|
||||
_ = f.fi.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
9
journal/global.go
Normal file
9
journal/global.go
Normal file
@ -0,0 +1,9 @@
|
||||
package journal
|
||||
|
||||
var (
|
||||
// J is a globally accessible Journal. It starts being NilJournal, and early
|
||||
// during the Lotus initialization routine, it is reset to whichever Journal
|
||||
// is configured (by default, the filesystem journal). Components can safely
|
||||
// record in the journal by calling: journal.J.RecordEvent(...).
|
||||
J Journal = NilJournal() // nolint
|
||||
)
|
@ -1,152 +0,0 @@
|
||||
package journal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
)
|
||||
|
||||
func InitializeSystemJournal(dir string) error {
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
j, err := OpenFSJournal(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
currentJournal = j
|
||||
return nil
|
||||
}
|
||||
|
||||
func Add(sys string, val interface{}) {
|
||||
if currentJournal == nil {
|
||||
log.Warn("no journal configured")
|
||||
return
|
||||
}
|
||||
currentJournal.AddEntry(sys, val)
|
||||
}
|
||||
|
||||
var log = logging.Logger("journal")
|
||||
|
||||
var currentJournal Journal
|
||||
|
||||
type Journal interface {
|
||||
AddEntry(system string, obj interface{})
|
||||
Close() error
|
||||
}
|
||||
|
||||
// fsJournal is a basic journal backed by files on a filesystem
|
||||
type fsJournal struct {
|
||||
fi *os.File
|
||||
fSize int64
|
||||
|
||||
journalDir string
|
||||
|
||||
incoming chan *JournalEntry
|
||||
journalSizeLimit int64
|
||||
|
||||
closing chan struct{}
|
||||
}
|
||||
|
||||
func OpenFSJournal(dir string) (Journal, error) {
|
||||
fsj := &fsJournal{
|
||||
journalDir: dir,
|
||||
incoming: make(chan *JournalEntry, 32),
|
||||
journalSizeLimit: 1 << 30,
|
||||
closing: make(chan struct{}),
|
||||
}
|
||||
|
||||
if err := fsj.rollJournalFile(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go fsj.runLoop()
|
||||
|
||||
return fsj, nil
|
||||
}
|
||||
|
||||
type JournalEntry struct {
|
||||
System string
|
||||
Timestamp time.Time
|
||||
Val interface{}
|
||||
}
|
||||
|
||||
func (fsj *fsJournal) putEntry(je *JournalEntry) error {
|
||||
b, err := json.Marshal(je)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := fsj.fi.Write(append(b, '\n'))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fsj.fSize += int64(n)
|
||||
|
||||
if fsj.fSize >= fsj.journalSizeLimit {
|
||||
return fsj.rollJournalFile()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
const RFC3339nocolon = "2006-01-02T150405Z0700"
|
||||
|
||||
func (fsj *fsJournal) rollJournalFile() error {
|
||||
if fsj.fi != nil {
|
||||
err := fsj.fi.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(RFC3339nocolon))))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to open journal file: %w", err)
|
||||
}
|
||||
|
||||
fsj.fi = nfi
|
||||
fsj.fSize = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fsj *fsJournal) runLoop() {
|
||||
for {
|
||||
select {
|
||||
case je := <-fsj.incoming:
|
||||
if err := fsj.putEntry(je); err != nil {
|
||||
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
|
||||
}
|
||||
case <-fsj.closing:
|
||||
if err := fsj.fi.Close(); err != nil {
|
||||
log.Errorw("failed to close journal", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fsj *fsJournal) AddEntry(system string, obj interface{}) {
|
||||
je := &JournalEntry{
|
||||
System: system,
|
||||
Timestamp: build.Clock.Now(),
|
||||
Val: obj,
|
||||
}
|
||||
select {
|
||||
case fsj.incoming <- je:
|
||||
case <-fsj.closing:
|
||||
log.Warnw("journal closed but tried to log event", "entry", je)
|
||||
}
|
||||
}
|
||||
|
||||
func (fsj *fsJournal) Close() error {
|
||||
close(fsj.closing)
|
||||
return nil
|
||||
}
|
16
journal/nil.go
Normal file
16
journal/nil.go
Normal file
@ -0,0 +1,16 @@
|
||||
package journal
|
||||
|
||||
type nilJournal struct{}
|
||||
|
||||
// nilj is a singleton nil journal.
|
||||
var nilj Journal = &nilJournal{}
|
||||
|
||||
func NilJournal() Journal {
|
||||
return nilj
|
||||
}
|
||||
|
||||
func (n *nilJournal) RegisterEventType(_, _ string) EventType { return EventType{} }
|
||||
|
||||
func (n *nilJournal) RecordEvent(_ EventType, _ func() interface{}) {}
|
||||
|
||||
func (n *nilJournal) Close() error { return nil }
|
57
journal/registry.go
Normal file
57
journal/registry.go
Normal file
@ -0,0 +1,57 @@
|
||||
package journal
|
||||
|
||||
import "sync"
|
||||
|
||||
// EventTypeRegistry is a component that constructs tracked EventType tokens,
|
||||
// for usage with a Journal.
|
||||
type EventTypeRegistry interface {
|
||||
|
||||
// RegisterEventType introduces a new event type to a journal, and
|
||||
// returns an EventType token that components can later use to check whether
|
||||
// journalling for that type is enabled/suppressed, and to tag journal
|
||||
// entries appropriately.
|
||||
RegisterEventType(system, event string) EventType
|
||||
}
|
||||
|
||||
// eventTypeRegistry is an embeddable mixin that takes care of tracking disabled
|
||||
// event types, and returning initialized/safe EventTypes when requested.
|
||||
type eventTypeRegistry struct {
|
||||
sync.Mutex
|
||||
|
||||
m map[string]EventType
|
||||
}
|
||||
|
||||
var _ EventTypeRegistry = (*eventTypeRegistry)(nil)
|
||||
|
||||
func NewEventTypeRegistry(disabled DisabledEvents) EventTypeRegistry {
|
||||
ret := &eventTypeRegistry{
|
||||
m: make(map[string]EventType, len(disabled)+32), // + extra capacity.
|
||||
}
|
||||
|
||||
for _, et := range disabled {
|
||||
et.enabled, et.safe = false, true
|
||||
ret.m[et.System+":"+et.Event] = et
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (d *eventTypeRegistry) RegisterEventType(system, event string) EventType {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
key := system + ":" + event
|
||||
if et, ok := d.m[key]; ok {
|
||||
return et
|
||||
}
|
||||
|
||||
et := EventType{
|
||||
System: system,
|
||||
Event: event,
|
||||
enabled: true,
|
||||
safe: true,
|
||||
}
|
||||
|
||||
d.m[key] = et
|
||||
return et
|
||||
}
|
49
journal/registry_test.go
Normal file
49
journal/registry_test.go
Normal file
@ -0,0 +1,49 @@
|
||||
package journal
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDisabledEvents(t *testing.T) {
|
||||
req := require.New(t)
|
||||
|
||||
test := func(dis DisabledEvents) func(*testing.T) {
|
||||
return func(t *testing.T) {
|
||||
registry := NewEventTypeRegistry(dis)
|
||||
|
||||
reg1 := registry.RegisterEventType("system1", "disabled1")
|
||||
reg2 := registry.RegisterEventType("system2", "disabled2")
|
||||
|
||||
req.False(reg1.Enabled())
|
||||
req.False(reg2.Enabled())
|
||||
req.True(reg1.safe)
|
||||
req.True(reg2.safe)
|
||||
|
||||
reg3 := registry.RegisterEventType("system3", "enabled3")
|
||||
req.True(reg3.Enabled())
|
||||
req.True(reg3.safe)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("direct", test(DisabledEvents{
|
||||
EventType{System: "system1", Event: "disabled1"},
|
||||
EventType{System: "system2", Event: "disabled2"},
|
||||
}))
|
||||
|
||||
dis, err := ParseDisabledEvents("system1:disabled1,system2:disabled2")
|
||||
req.NoError(err)
|
||||
|
||||
t.Run("parsed", test(dis))
|
||||
|
||||
dis, err = ParseDisabledEvents(" system1:disabled1 , system2:disabled2 ")
|
||||
req.NoError(err)
|
||||
|
||||
t.Run("parsed_spaces", test(dis))
|
||||
}
|
||||
|
||||
func TestParseDisableEvents(t *testing.T) {
|
||||
_, err := ParseDisabledEvents("system1:disabled1:failed,system2:disabled2")
|
||||
require.Error(t, err)
|
||||
}
|
102
journal/types.go
Normal file
102
journal/types.go
Normal file
@ -0,0 +1,102 @@
|
||||
package journal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
)
|
||||
|
||||
var log = logging.Logger("journal")
|
||||
|
||||
var (
|
||||
// DefaultDisabledEvents lists the journal events disabled by
|
||||
// default, usually because they are considered noisy.
|
||||
DefaultDisabledEvents = DisabledEvents{
|
||||
EventType{System: "mpool", Event: "add"},
|
||||
EventType{System: "mpool", Event: "remove"},
|
||||
}
|
||||
)
|
||||
|
||||
// DisabledEvents is the set of event types whose journaling is suppressed.
|
||||
type DisabledEvents []EventType
|
||||
|
||||
// ParseDisabledEvents parses a string of the form: "system1:event1,system2:event2[,...]"
|
||||
// into a DisabledEvents object, returning an error if the string failed to parse.
|
||||
//
|
||||
// It sanitizes strings via strings.TrimSpace.
|
||||
func ParseDisabledEvents(s string) (DisabledEvents, error) {
|
||||
s = strings.TrimSpace(s) // sanitize
|
||||
evts := strings.Split(s, ",")
|
||||
ret := make(DisabledEvents, 0, len(evts))
|
||||
for _, evt := range evts {
|
||||
evt = strings.TrimSpace(evt) // sanitize
|
||||
s := strings.Split(evt, ":")
|
||||
if len(s) != 2 {
|
||||
return nil, fmt.Errorf("invalid event type: %s", s)
|
||||
}
|
||||
ret = append(ret, EventType{System: s[0], Event: s[1]})
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// EventType represents the signature of an event.
|
||||
type EventType struct {
|
||||
System string
|
||||
Event string
|
||||
|
||||
// enabled stores whether this event type is enabled.
|
||||
enabled bool
|
||||
|
||||
// safe is a sentinel marker that's set to true if this EventType was
|
||||
// constructed correctly (via Journal#RegisterEventType).
|
||||
safe bool
|
||||
}
|
||||
|
||||
func (et EventType) String() string {
|
||||
return et.System + ":" + et.Event
|
||||
}
|
||||
|
||||
// Enabled returns whether this event type is enabled in the journaling
|
||||
// subsystem. Users are advised to check this before actually attempting to
|
||||
// add a journal entry, as it helps bypass object construction for events that
|
||||
// would be discarded anyway.
|
||||
//
|
||||
// All event types are enabled by default, and specific event types can only
|
||||
// be disabled at Journal construction time.
|
||||
func (et EventType) Enabled() bool {
|
||||
return et.safe && et.enabled
|
||||
}
|
||||
|
||||
// Journal represents an audit trail of system actions.
|
||||
//
|
||||
// Every entry is tagged with a timestamp, a system name, and an event name.
|
||||
// The supplied data can be any type, as long as it is JSON serializable,
|
||||
// including structs, map[string]interface{}, or primitive types.
|
||||
//
|
||||
// For cleanliness and type safety, we recommend to use typed events. See the
|
||||
// *Evt struct types in this package for more info.
|
||||
type Journal interface {
|
||||
EventTypeRegistry
|
||||
|
||||
// RecordEvent records this event to the journal, if and only if the
|
||||
// EventType is enabled. If so, it calls the supplier function to obtain
|
||||
// the payload to record.
|
||||
//
|
||||
// Implementations MUST recover from panics raised by the supplier function.
|
||||
RecordEvent(evtType EventType, supplier func() interface{})
|
||||
|
||||
// Close closes this journal for further writing.
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Event represents a journal entry.
|
||||
//
|
||||
// See godocs on Journal for more information.
|
||||
type Event struct {
|
||||
EventType
|
||||
|
||||
Timestamp time.Time
|
||||
Data interface{}
|
||||
}
|
76
markets/journal.go
Normal file
76
markets/journal.go
Normal file
@ -0,0 +1,76 @@
|
||||
package markets
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
)
|
||||
|
||||
type StorageClientEvt struct {
|
||||
Event string
|
||||
Deal storagemarket.ClientDeal
|
||||
}
|
||||
|
||||
type StorageProviderEvt struct {
|
||||
Event string
|
||||
Deal storagemarket.MinerDeal
|
||||
}
|
||||
|
||||
type RetrievalClientEvt struct {
|
||||
Event string
|
||||
Deal retrievalmarket.ClientDealState
|
||||
}
|
||||
|
||||
type RetrievalProviderEvt struct {
|
||||
Event string
|
||||
Deal retrievalmarket.ProviderDealState
|
||||
}
|
||||
|
||||
// StorageClientJournaler records journal events from the storage client.
|
||||
func StorageClientJournaler(evtType journal.EventType) func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
|
||||
return func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
|
||||
journal.J.RecordEvent(evtType, func() interface{} {
|
||||
return StorageClientEvt{
|
||||
Event: storagemarket.ClientEvents[event],
|
||||
Deal: deal,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// StorageProviderJournaler records journal events from the storage provider.
|
||||
func StorageProviderJournaler(evtType journal.EventType) func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
||||
return func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
||||
journal.J.RecordEvent(evtType, func() interface{} {
|
||||
return StorageProviderEvt{
|
||||
Event: storagemarket.ProviderEvents[event],
|
||||
Deal: deal,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// RetrievalClientJournaler records journal events from the retrieval client.
|
||||
func RetrievalClientJournaler(evtType journal.EventType) func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
|
||||
return func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
|
||||
journal.J.RecordEvent(evtType, func() interface{} {
|
||||
return RetrievalClientEvt{
|
||||
Event: retrievalmarket.ClientEvents[event],
|
||||
Deal: deal,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// RetrievalProviderJournaler records journal events from the retrieval provider.
|
||||
func RetrievalProviderJournaler(evtType journal.EventType) func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
|
||||
return func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
|
||||
journal.J.RecordEvent(evtType, func() interface{} {
|
||||
return RetrievalProviderEvt{
|
||||
Event: retrievalmarket.ProviderEvents[event],
|
||||
Deal: deal,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -33,6 +33,11 @@ import (
|
||||
|
||||
var log = logging.Logger("miner")
|
||||
|
||||
// Journal event types.
|
||||
const (
|
||||
evtTypeBlockMined = iota
|
||||
)
|
||||
|
||||
// returns a callback reporting whether we mined a blocks in this round
|
||||
type waitFunc func(ctx context.Context, baseTime uint64) (func(bool, abi.ChainEpoch, error), abi.ChainEpoch, error)
|
||||
|
||||
@ -68,6 +73,9 @@ func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address,
|
||||
|
||||
sf: sf,
|
||||
minedBlockHeights: arc,
|
||||
evtTypes: [...]journal.EventType{
|
||||
evtTypeBlockMined: journal.J.RegisterEventType("miner", "block_mined"),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -87,6 +95,8 @@ type Miner struct {
|
||||
|
||||
sf *slashfilter.SlashFilter
|
||||
minedBlockHeights *lru.ARCCache
|
||||
|
||||
evtTypes [1]journal.EventType
|
||||
}
|
||||
|
||||
func (m *Miner) Address() address.Address {
|
||||
@ -220,12 +230,14 @@ func (m *Miner) mine(ctx context.Context) {
|
||||
onDone(b != nil, h, nil)
|
||||
|
||||
if b != nil {
|
||||
journal.Add("blockMined", map[string]interface{}{
|
||||
journal.J.RecordEvent(m.evtTypes[evtTypeBlockMined], func() interface{} {
|
||||
return map[string]interface{}{
|
||||
"parents": base.TipSet.Cids(),
|
||||
"nulls": base.NullRounds,
|
||||
"epoch": b.Header.Height,
|
||||
"timestamp": b.Header.Timestamp,
|
||||
"cid": b.Header.Cid(),
|
||||
}
|
||||
})
|
||||
|
||||
btime := time.Unix(int64(b.Header.Timestamp), 0)
|
||||
|
@ -3,6 +3,7 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
@ -44,6 +45,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
|
||||
@ -67,6 +69,10 @@ import (
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
)
|
||||
|
||||
// EnvJournalDisabledEvents is the environment variable through which disabled
|
||||
// journal events can be customized.
|
||||
const EnvJournalDisabledEvents = "LOTUS_JOURNAL_DISABLED_EVENTS"
|
||||
|
||||
//nolint:deadcode,varcheck
|
||||
var log = logging.Logger("builder")
|
||||
|
||||
@ -91,11 +97,16 @@ var (
|
||||
|
||||
type invoke int
|
||||
|
||||
// Invokes are called in the order they are defined.
|
||||
//nolint:golint
|
||||
const (
|
||||
// InitJournal at position 0 initializes the journal global var as soon as
|
||||
// the system starts, so that it's available for all other components.
|
||||
InitJournalKey = invoke(iota)
|
||||
|
||||
// libp2p
|
||||
|
||||
PstoreAddSelfKeysKey = invoke(iota)
|
||||
PstoreAddSelfKeysKey
|
||||
StartListeningKey
|
||||
BootstrapKey
|
||||
|
||||
@ -123,7 +134,6 @@ const (
|
||||
HeadMetricsKey
|
||||
SettlePaymentChannelsKey
|
||||
RunPeerTaggerKey
|
||||
JournalKey
|
||||
|
||||
SetApiEndpointKey
|
||||
|
||||
@ -151,11 +161,25 @@ type Settings struct {
|
||||
|
||||
func defaults() []Option {
|
||||
return []Option{
|
||||
// global system journal.
|
||||
Override(new(journal.DisabledEvents), func() journal.DisabledEvents {
|
||||
if env, ok := os.LookupEnv(EnvJournalDisabledEvents); ok {
|
||||
if ret, err := journal.ParseDisabledEvents(env); err == nil {
|
||||
return ret
|
||||
}
|
||||
}
|
||||
// fallback if env variable is not set, or if it failed to parse.
|
||||
return journal.DefaultDisabledEvents
|
||||
}),
|
||||
Override(new(journal.Journal), modules.OpenFilesystemJournal),
|
||||
Override(InitJournalKey, func(j journal.Journal) {
|
||||
journal.J = j // eagerly sets the global journal through fx.Invoke.
|
||||
}),
|
||||
|
||||
Override(new(helpers.MetricsCtx), context.Background),
|
||||
Override(new(record.Validator), modules.RecordValidator),
|
||||
Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)),
|
||||
Override(new(dtypes.ShutdownChan), make(chan struct{})),
|
||||
Override(JournalKey, modules.SetupJournal),
|
||||
|
||||
// Filecoin modules
|
||||
|
||||
|
@ -300,7 +300,7 @@ func resolveOnce(bs blockstore.Blockstore) func(ctx context.Context, ds ipld.Nod
|
||||
return nil, nil, xerrors.Errorf("parsing int64: %w", err)
|
||||
}
|
||||
|
||||
ik := adt.IntKey(i)
|
||||
ik := abi.IntKey(i)
|
||||
|
||||
names[0] = "@H:" + ik.Key()
|
||||
}
|
||||
@ -311,7 +311,7 @@ func resolveOnce(bs blockstore.Blockstore) func(ctx context.Context, ds ipld.Nod
|
||||
return nil, nil, xerrors.Errorf("parsing uint64: %w", err)
|
||||
}
|
||||
|
||||
ik := adt.UIntKey(i)
|
||||
ik := abi.UIntKey(i)
|
||||
|
||||
names[0] = "@H:" + ik.Key()
|
||||
}
|
||||
|
@ -522,7 +522,7 @@ func (a *StateAPI) StateMarketParticipants(ctx context.Context, tsk types.TipSet
|
||||
return err
|
||||
}
|
||||
|
||||
if found, err := locked.Get(adt.AddrKey(a), &lk); err != nil {
|
||||
if found, err := locked.Get(abi.AddrKey(a), &lk); err != nil {
|
||||
return err
|
||||
} else if !found {
|
||||
return fmt.Errorf("locked funds not found")
|
||||
@ -617,7 +617,7 @@ func (a *StateAPI) StateChangedActors(ctx context.Context, old cid.Cid, new cid.
|
||||
return xerrors.Errorf("address in state tree was not valid: %w", err)
|
||||
}
|
||||
|
||||
found, err := oh.Get(adt.AddrKey(addr), &ocval)
|
||||
found, err := oh.Get(abi.AddrKey(addr), &ocval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1163,7 +1163,7 @@ func (a *StateAPI) StateVerifiedClientStatus(ctx context.Context, addr address.A
|
||||
}
|
||||
|
||||
var dcap verifreg.DataCap
|
||||
if found, err := vh.Get(adt.AddrKey(aid), &dcap); err != nil {
|
||||
if found, err := vh.Get(abi.AddrKey(aid), &dcap); err != nil {
|
||||
return nil, err
|
||||
} else if !found {
|
||||
return nil, nil
|
||||
|
@ -26,7 +26,9 @@ import (
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||
"github.com/filecoin-project/lotus/markets"
|
||||
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
||||
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
@ -119,6 +121,10 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
c.SubscribeToEvents(marketevents.StorageClientLogger)
|
||||
|
||||
evtType := journal.J.RegisterEventType("markets/storage/client", "state_change")
|
||||
c.SubscribeToEvents(markets.StorageClientJournaler(evtType))
|
||||
|
||||
return c.Start(ctx)
|
||||
},
|
||||
OnStop: func(context.Context) error {
|
||||
@ -140,6 +146,10 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore,
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
|
||||
|
||||
evtType := journal.J.RegisterEventType("markets/retrieval/client", "state_change")
|
||||
client.SubscribeToEvents(markets.RetrievalClientJournaler(evtType))
|
||||
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/gbrlsnchs/jwt/v3"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
@ -20,7 +19,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/api/apistruct"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/lib/addrutil"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
@ -107,7 +105,3 @@ func DrandBootstrap(ds dtypes.DrandSchedule) (dtypes.DrandBootstrap, error) {
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func SetupJournal(lr repo.LockedRepo) error {
|
||||
return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal"))
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package modules
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
eventbus "github.com/libp2p/go-eventbus"
|
||||
@ -22,10 +24,12 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/sub"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||
"github.com/filecoin-project/lotus/node/hello"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error {
|
||||
@ -150,3 +154,16 @@ func RandomSchedule(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Sche
|
||||
|
||||
return shd, nil
|
||||
}
|
||||
|
||||
func OpenFilesystemJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled journal.DisabledEvents) (journal.Journal, error) {
|
||||
jrnl, err := journal.OpenFSJournal(lr, disabled)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(_ context.Context) error { return jrnl.Close() },
|
||||
})
|
||||
|
||||
return jrnl, err
|
||||
}
|
||||
|
@ -49,6 +49,8 @@ import (
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/markets"
|
||||
|
||||
lapi "github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
@ -142,8 +144,34 @@ func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
|
||||
return &sidsc{sc}
|
||||
}
|
||||
|
||||
func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
|
||||
type StorageMinerParams struct {
|
||||
fx.In
|
||||
|
||||
Lifecycle fx.Lifecycle
|
||||
MetricsCtx helpers.MetricsCtx
|
||||
API lapi.FullNode
|
||||
Host host.Host
|
||||
MetadataDS dtypes.MetadataDS
|
||||
Sealer sectorstorage.SectorManager
|
||||
SectorIDCounter sealing.SectorIDCounter
|
||||
Verifier ffiwrapper.Verifier
|
||||
GetSealingConfigFn dtypes.GetSealingConfigFunc
|
||||
}
|
||||
|
||||
func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) {
|
||||
return func(params StorageMinerParams) (*storage.Miner, error) {
|
||||
var (
|
||||
ds = params.MetadataDS
|
||||
mctx = params.MetricsCtx
|
||||
lc = params.Lifecycle
|
||||
api = params.API
|
||||
sealer = params.Sealer
|
||||
h = params.Host
|
||||
sc = params.SectorIDCounter
|
||||
verif = params.Verifier
|
||||
gsd = params.GetSealingConfigFn
|
||||
)
|
||||
|
||||
maddr, err := minerAddrFromDS(ds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -187,6 +215,10 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(context.Context) error {
|
||||
m.SubscribeToEvents(marketevents.RetrievalProviderLogger)
|
||||
|
||||
evtType := journal.J.RegisterEventType("markets/retrieval/provider", "state_change")
|
||||
m.SubscribeToEvents(markets.RetrievalProviderJournaler(evtType))
|
||||
|
||||
return m.Start()
|
||||
},
|
||||
OnStop: func(context.Context) error {
|
||||
@ -201,6 +233,10 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(context.Context) error {
|
||||
h.SubscribeToEvents(marketevents.StorageProviderLogger)
|
||||
|
||||
evtType := journal.J.RegisterEventType("markets/storage/provider", "state_change")
|
||||
h.SubscribeToEvents(markets.StorageProviderJournaler(evtType))
|
||||
|
||||
return h.Start(ctx)
|
||||
},
|
||||
OnStop: func(context.Context) error {
|
||||
|
@ -192,7 +192,7 @@ func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr a
|
||||
}
|
||||
|
||||
var pci miner.SectorPreCommitOnChainInfo
|
||||
ok, err := precommits.Get(adt.UIntKey(uint64(sectorNumber)), &pci)
|
||||
ok, err := precommits.Get(abi.UIntKey(uint64(sectorNumber)), &pci)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/gen"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
)
|
||||
@ -50,6 +51,17 @@ type Miner struct {
|
||||
|
||||
getSealConfig dtypes.GetSealingConfigFunc
|
||||
sealing *sealing.Sealing
|
||||
|
||||
sealingEvtType journal.EventType
|
||||
}
|
||||
|
||||
// SealingStateEvt is a journal event that records a sector state transition.
|
||||
type SealingStateEvt struct {
|
||||
SectorNumber abi.SectorNumber
|
||||
SectorType abi.RegisteredSealProof
|
||||
From sealing.SectorState
|
||||
After sealing.SectorState
|
||||
Error string
|
||||
}
|
||||
|
||||
type storageMinerApi interface {
|
||||
@ -106,6 +118,7 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
|
||||
maddr: maddr,
|
||||
worker: worker,
|
||||
getSealConfig: gsd,
|
||||
sealingEvtType: journal.J.RegisterEventType("storage", "sealing_states"),
|
||||
}
|
||||
|
||||
return m, nil
|
||||
@ -129,13 +142,25 @@ func (m *Miner) Run(ctx context.Context) error {
|
||||
evts := events.NewEvents(ctx, m.api)
|
||||
adaptedAPI := NewSealingAPIAdapter(m.api)
|
||||
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod)
|
||||
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig))
|
||||
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig), m.handleSealingNotifications)
|
||||
|
||||
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Miner) handleSealingNotifications(before, after sealing.SectorInfo) {
|
||||
journal.J.RecordEvent(m.sealingEvtType, func() interface{} {
|
||||
return SealingStateEvt{
|
||||
SectorNumber: before.SectorNumber,
|
||||
SectorType: before.SectorType,
|
||||
From: before.State,
|
||||
After: after.State,
|
||||
Error: after.LastErr,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Miner) Stop(ctx context.Context) error {
|
||||
return m.sealing.Stop(ctx)
|
||||
}
|
||||
|
75
storage/wdpost_journal.go
Normal file
75
storage/wdpost_journal.go
Normal file
@ -0,0 +1,75 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// SchedulerState defines the possible states in which the scheduler could be,
|
||||
// for the purposes of journalling.
|
||||
type SchedulerState string
|
||||
|
||||
const (
|
||||
// SchedulerStateStarted gets recorded when a WdPoSt cycle for an
|
||||
// epoch begins.
|
||||
SchedulerStateStarted = SchedulerState("started")
|
||||
// SchedulerStateAborted gets recorded when a WdPoSt cycle for an
|
||||
// epoch is aborted, normally because of a chain reorg or advancement.
|
||||
SchedulerStateAborted = SchedulerState("aborted")
|
||||
// SchedulerStateFaulted gets recorded when a WdPoSt cycle for an
|
||||
// epoch terminates abnormally, in which case the error is also recorded.
|
||||
SchedulerStateFaulted = SchedulerState("faulted")
|
||||
// SchedulerStateSucceeded gets recorded when a WdPoSt cycle for an
|
||||
// epoch ends successfully.
|
||||
SchedulerStateSucceeded = SchedulerState("succeeded")
|
||||
)
|
||||
|
||||
// Journal event types.
|
||||
const (
|
||||
evtTypeWdPoStScheduler = iota
|
||||
evtTypeWdPoStProofs
|
||||
evtTypeWdPoStRecoveries
|
||||
evtTypeWdPoStFaults
|
||||
)
|
||||
|
||||
// evtCommon is a common set of attributes for Windowed PoSt journal events.
|
||||
type evtCommon struct {
|
||||
Deadline *dline.Info
|
||||
Height abi.ChainEpoch
|
||||
TipSet []cid.Cid
|
||||
Error error `json:",omitempty"`
|
||||
}
|
||||
|
||||
// WdPoStSchedulerEvt is the journal event that gets recorded on scheduler
|
||||
// actions.
|
||||
type WdPoStSchedulerEvt struct {
|
||||
evtCommon
|
||||
State SchedulerState
|
||||
}
|
||||
|
||||
// WdPoStProofsProcessedEvt is the journal event that gets recorded when
|
||||
// Windowed PoSt proofs have been processed.
|
||||
type WdPoStProofsProcessedEvt struct {
|
||||
evtCommon
|
||||
Partitions []miner.PoStPartition
|
||||
MessageCID cid.Cid `json:",omitempty"`
|
||||
}
|
||||
|
||||
// WdPoStRecoveriesProcessedEvt is the journal event that gets recorded when
|
||||
// Windowed PoSt recoveries have been processed.
|
||||
type WdPoStRecoveriesProcessedEvt struct {
|
||||
evtCommon
|
||||
Declarations []miner.RecoveryDeclaration
|
||||
MessageCID cid.Cid `json:",omitempty"`
|
||||
}
|
||||
|
||||
// WdPoStFaultsProcessedEvt is the journal event that gets recorded when
|
||||
// Windowed PoSt faults have been processed.
|
||||
type WdPoStFaultsProcessedEvt struct {
|
||||
evtCommon
|
||||
Declarations []miner.FaultDeclaration
|
||||
MessageCID cid.Cid `json:",omitempty"`
|
||||
}
|
@ -14,6 +14,9 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -27,11 +30,19 @@ import (
|
||||
iminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
)
|
||||
|
||||
var errNoPartitions = errors.New("no partitions")
|
||||
|
||||
func (s *WindowPoStScheduler) failPost(deadline *dline.Info) {
|
||||
func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) {
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||
return WdPoStSchedulerEvt{
|
||||
evtCommon: s.getEvtCommon(err),
|
||||
State: SchedulerStateFaulted,
|
||||
}
|
||||
})
|
||||
|
||||
log.Errorf("TODO")
|
||||
/*s.failLk.Lock()
|
||||
if eps > s.failed {
|
||||
@ -46,27 +57,56 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *dline.Info,
|
||||
s.abort = abort
|
||||
s.activeDeadline = deadline
|
||||
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||
return WdPoStSchedulerEvt{
|
||||
evtCommon: s.getEvtCommon(nil),
|
||||
State: SchedulerStateStarted,
|
||||
}
|
||||
})
|
||||
|
||||
go func() {
|
||||
defer abort()
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.doPost")
|
||||
defer span.End()
|
||||
|
||||
// recordProofsEvent records a successful proofs_processed event in the
|
||||
// journal, even if it was a noop (no partitions).
|
||||
recordProofsEvent := func(partitions []miner.PoStPartition, mcid cid.Cid) {
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStProofs], func() interface{} {
|
||||
return &WdPoStProofsProcessedEvt{
|
||||
evtCommon: s.getEvtCommon(nil),
|
||||
Partitions: partitions,
|
||||
MessageCID: mcid,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
proof, err := s.runPost(ctx, *deadline, ts)
|
||||
switch err {
|
||||
case errNoPartitions:
|
||||
recordProofsEvent(nil, cid.Undef)
|
||||
return
|
||||
case nil:
|
||||
if err := s.submitPost(ctx, proof); err != nil {
|
||||
sm, err := s.submitPost(ctx, proof)
|
||||
if err != nil {
|
||||
log.Errorf("submitPost failed: %+v", err)
|
||||
s.failPost(deadline)
|
||||
s.failPost(err, deadline)
|
||||
return
|
||||
}
|
||||
recordProofsEvent(proof.Partitions, sm.Cid())
|
||||
default:
|
||||
log.Errorf("runPost failed: %+v", err)
|
||||
s.failPost(deadline)
|
||||
s.failPost(err, deadline)
|
||||
return
|
||||
}
|
||||
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||
return WdPoStSchedulerEvt{
|
||||
evtCommon: s.getEvtCommon(nil),
|
||||
State: SchedulerStateSucceeded,
|
||||
}
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
@ -115,16 +155,15 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
|
||||
return sbf, nil
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []iminer.Partition) error {
|
||||
func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []iminer.Partition) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "storage.checkNextRecoveries")
|
||||
defer span.End()
|
||||
|
||||
faulty := uint64(0)
|
||||
params := &miner.DeclareFaultsRecoveredParams{
|
||||
Recoveries: []miner.RecoveryDeclaration{},
|
||||
}
|
||||
|
||||
faulty := uint64(0)
|
||||
|
||||
for partIdx, partition := range partitions {
|
||||
faults, err := partition.FaultySectors()
|
||||
if err != nil {
|
||||
@ -136,12 +175,12 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
|
||||
}
|
||||
unrecovered, err := bitfield.SubtractBitField(faults, recovering)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("subtracting recovered set from fault set: %w", err)
|
||||
return nil, nil, xerrors.Errorf("subtracting recovered set from fault set: %w", err)
|
||||
}
|
||||
|
||||
uc, err := unrecovered.Count()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("counting unrecovered sectors: %w", err)
|
||||
return nil, nil, xerrors.Errorf("counting unrecovered sectors: %w", err)
|
||||
}
|
||||
|
||||
if uc == 0 {
|
||||
@ -152,13 +191,13 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
|
||||
|
||||
recovered, err := s.checkSectors(ctx, unrecovered)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("checking unrecovered sectors: %w", err)
|
||||
return nil, nil, xerrors.Errorf("checking unrecovered sectors: %w", err)
|
||||
}
|
||||
|
||||
// if all sectors failed to recover, don't declare recoveries
|
||||
recoveredCount, err := recovered.Count()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("counting recovered sectors: %w", err)
|
||||
return nil, nil, xerrors.Errorf("counting recovered sectors: %w", err)
|
||||
}
|
||||
|
||||
if recoveredCount == 0 {
|
||||
@ -172,17 +211,18 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
|
||||
})
|
||||
}
|
||||
|
||||
if len(params.Recoveries) == 0 {
|
||||
recoveries := params.Recoveries
|
||||
if len(recoveries) == 0 {
|
||||
if faulty != 0 {
|
||||
log.Warnw("No recoveries to declare", "deadline", dlIdx, "faulty", faulty)
|
||||
}
|
||||
|
||||
return nil
|
||||
return recoveries, nil, nil
|
||||
}
|
||||
|
||||
enc, aerr := actors.SerializeParams(params)
|
||||
if aerr != nil {
|
||||
return xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
|
||||
return recoveries, nil, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
@ -197,52 +237,51 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
|
||||
|
||||
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
return recoveries, sm, xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
}
|
||||
|
||||
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
|
||||
|
||||
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("declare faults recovered wait error: %w", err)
|
||||
return recoveries, sm, xerrors.Errorf("declare faults recovered wait error: %w", err)
|
||||
}
|
||||
|
||||
if rec.Receipt.ExitCode != 0 {
|
||||
return xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
|
||||
return recoveries, sm, xerrors.Errorf("declare faults recovered wait non-0 exit code: %d", rec.Receipt.ExitCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
return recoveries, sm, nil
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []iminer.Partition) error {
|
||||
func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []iminer.Partition) ([]miner.FaultDeclaration, *types.SignedMessage, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "storage.checkNextFaults")
|
||||
defer span.End()
|
||||
|
||||
bad := uint64(0)
|
||||
params := &miner.DeclareFaultsParams{
|
||||
Faults: []miner.FaultDeclaration{},
|
||||
}
|
||||
|
||||
bad := uint64(0)
|
||||
|
||||
for _, partition := range partitions {
|
||||
for partIdx, partition := range partitions {
|
||||
toCheck, err := partition.ActiveSectors()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting active sectors: %w", err)
|
||||
return nil, nil, xerrors.Errorf("getting active sectors: %w", err)
|
||||
}
|
||||
|
||||
good, err := s.checkSectors(ctx, toCheck)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("checking sectors: %w", err)
|
||||
return nil, nil, xerrors.Errorf("checking sectors: %w", err)
|
||||
}
|
||||
|
||||
faulty, err := bitfield.SubtractBitField(toCheck, good)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("calculating faulty sector set: %w", err)
|
||||
return nil, nil, xerrors.Errorf("calculating faulty sector set: %w", err)
|
||||
}
|
||||
|
||||
c, err := faulty.Count()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("counting faulty sectors: %w", err)
|
||||
return nil, nil, xerrors.Errorf("counting faulty sectors: %w", err)
|
||||
}
|
||||
|
||||
if c == 0 {
|
||||
@ -258,15 +297,16 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
|
||||
})
|
||||
}
|
||||
|
||||
if len(params.Faults) == 0 {
|
||||
return nil
|
||||
faults := params.Faults
|
||||
if len(faults) == 0 {
|
||||
return faults, nil, nil
|
||||
}
|
||||
|
||||
log.Errorw("DETECTED FAULTY SECTORS, declaring faults", "count", bad)
|
||||
|
||||
enc, aerr := actors.SerializeParams(params)
|
||||
if aerr != nil {
|
||||
return xerrors.Errorf("could not serialize declare faults parameters: %w", aerr)
|
||||
return faults, nil, xerrors.Errorf("could not serialize declare faults parameters: %w", aerr)
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
@ -281,21 +321,21 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
|
||||
|
||||
sm, err := s.api.MpoolPushMessage(ctx, msg, spec)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
return faults, sm, xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
}
|
||||
|
||||
log.Warnw("declare faults Message CID", "cid", sm.Cid())
|
||||
|
||||
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("declare faults wait error: %w", err)
|
||||
return faults, sm, xerrors.Errorf("declare faults wait error: %w", err)
|
||||
}
|
||||
|
||||
if rec.Receipt.ExitCode != 0 {
|
||||
return xerrors.Errorf("declare faults wait non-0 exit code: %d", rec.Receipt.ExitCode)
|
||||
return faults, sm, xerrors.Errorf("declare faults wait non-0 exit code: %d", rec.Receipt.ExitCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
return faults, sm, nil
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) (*miner.SubmitWindowedPoStParams, error) {
|
||||
@ -335,15 +375,49 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.checkNextRecoveries(context.TODO(), declDeadline, partitions); err != nil {
|
||||
var (
|
||||
sigmsg *types.SignedMessage
|
||||
recoveries []miner.RecoveryDeclaration
|
||||
faults []miner.FaultDeclaration
|
||||
|
||||
// optionalCid returns the CID of the message, or cid.Undef is the
|
||||
// message is nil. We don't need the argument (could capture the
|
||||
// pointer), but it's clearer and purer like that.
|
||||
optionalCid = func(sigmsg *types.SignedMessage) cid.Cid {
|
||||
if sigmsg == nil {
|
||||
return cid.Undef
|
||||
}
|
||||
return sigmsg.Cid()
|
||||
}
|
||||
)
|
||||
|
||||
if recoveries, sigmsg, err = s.checkNextRecoveries(context.TODO(), declDeadline, partitions); err != nil {
|
||||
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
|
||||
log.Errorf("checking sector recoveries: %v", err)
|
||||
}
|
||||
|
||||
if err := s.checkNextFaults(context.TODO(), declDeadline, partitions); err != nil {
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStRecoveries], func() interface{} {
|
||||
j := WdPoStRecoveriesProcessedEvt{
|
||||
evtCommon: s.getEvtCommon(err),
|
||||
Declarations: recoveries,
|
||||
MessageCID: optionalCid(sigmsg),
|
||||
}
|
||||
j.Error = err
|
||||
return j
|
||||
})
|
||||
|
||||
if faults, sigmsg, err = s.checkNextFaults(context.TODO(), declDeadline, partitions); err != nil {
|
||||
// TODO: This is also potentially really bad, but we try to post anyways
|
||||
log.Errorf("checking sector faults: %v", err)
|
||||
}
|
||||
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStFaults], func() interface{} {
|
||||
return WdPoStFaultsProcessedEvt{
|
||||
evtCommon: s.getEvtCommon(err),
|
||||
Declarations: faults,
|
||||
MessageCID: optionalCid(sigmsg),
|
||||
}
|
||||
})
|
||||
}()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
@ -396,16 +470,16 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
|
||||
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
|
||||
}
|
||||
|
||||
toProve, err = bitfield.SubtractBitField(toProve, postSkipped)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
|
||||
}
|
||||
|
||||
good, err := s.checkSectors(ctx, toProve)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
|
||||
}
|
||||
|
||||
good, err = bitfield.SubtractBitField(good, postSkipped)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
|
||||
}
|
||||
|
||||
skipped, err := bitfield.SubtractBitField(toProve, good)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("toProve - good: %w", err)
|
||||
@ -537,13 +611,15 @@ func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors,
|
||||
return proofSectors, nil
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.SubmitWindowedPoStParams) error {
|
||||
func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.SubmitWindowedPoStParams) (*types.SignedMessage, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "storage.commitPost")
|
||||
defer span.End()
|
||||
|
||||
var sm *types.SignedMessage
|
||||
|
||||
enc, aerr := actors.SerializeParams(proof)
|
||||
if aerr != nil {
|
||||
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
|
||||
return nil, xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
@ -558,8 +634,9 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
|
||||
|
||||
// TODO: consider maybe caring about the output
|
||||
sm, err := s.api.MpoolPushMessage(ctx, msg, spec)
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
return nil, xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("Submitted window post: %s", sm.Cid())
|
||||
@ -578,7 +655,7 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
|
||||
log.Errorf("Submitting window post %s failed: exit %d", sm.Cid(), rec.Receipt.ExitCode)
|
||||
}()
|
||||
|
||||
return nil
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) {
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
|
||||
"go.opencensus.io/trace"
|
||||
@ -42,6 +43,8 @@ type WindowPoStScheduler struct {
|
||||
activeDeadline *dline.Info
|
||||
abort context.CancelFunc
|
||||
|
||||
evtTypes [4]journal.EventType
|
||||
|
||||
// failed abi.ChainEpoch // eps
|
||||
// failLk sync.Mutex
|
||||
}
|
||||
@ -67,6 +70,12 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb
|
||||
|
||||
actor: actor,
|
||||
worker: worker,
|
||||
evtTypes: [...]journal.EventType{
|
||||
evtTypeWdPoStScheduler: journal.J.RegisterEventType("wdpost", "scheduler"),
|
||||
evtTypeWdPoStProofs: journal.J.RegisterEventType("wdpost", "proofs_processed"),
|
||||
evtTypeWdPoStRecoveries: journal.J.RegisterEventType("wdpost", "recoveries_processed"),
|
||||
evtTypeWdPoStFaults: journal.J.RegisterEventType("wdpost", "faults_processed"),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -112,12 +121,13 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
|
||||
log.Errorf("expected first notif to have len = 1")
|
||||
continue
|
||||
}
|
||||
if changes[0].Type != store.HCCurrent {
|
||||
chg := changes[0]
|
||||
if chg.Type != store.HCCurrent {
|
||||
log.Errorf("expected first notif to tell current ts")
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.update(ctx, changes[0].Val); err != nil {
|
||||
if err := s.update(ctx, chg.Val); err != nil {
|
||||
log.Errorf("%+v", err)
|
||||
}
|
||||
|
||||
@ -221,10 +231,29 @@ func (s *WindowPoStScheduler) abortActivePoSt() {
|
||||
|
||||
if s.abort != nil {
|
||||
s.abort()
|
||||
|
||||
journal.J.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
|
||||
return WdPoStSchedulerEvt{
|
||||
evtCommon: s.getEvtCommon(nil),
|
||||
State: SchedulerStateAborted,
|
||||
}
|
||||
})
|
||||
|
||||
log.Warnf("Aborting Window PoSt (Deadline: %+v)", s.activeDeadline)
|
||||
}
|
||||
|
||||
s.activeDeadline = nil
|
||||
s.abort = nil
|
||||
}
|
||||
|
||||
// getEvtCommon populates and returns common attributes from state, for a
|
||||
// WdPoSt journal event.
|
||||
func (s *WindowPoStScheduler) getEvtCommon(err error) evtCommon {
|
||||
c := evtCommon{Error: err}
|
||||
if s.cur != nil {
|
||||
c.Deadline = s.activeDeadline
|
||||
c.Height = s.cur.Height()
|
||||
c.TipSet = s.cur.Cids()
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user