record deals events in journal.

This commit is contained in:
Raúl Kripalani 2020-07-21 17:26:44 +01:00
parent cb8e209f78
commit d547c2588c
3 changed files with 132 additions and 7 deletions

View File

@ -7,6 +7,7 @@ import (
"context"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/journal"
"golang.org/x/xerrors"
@ -43,6 +44,9 @@ type ClientNodeAdapter struct {
cs *store.ChainStore
fm *market.FundMgr
ev *events.Events
jrnl journal.Journal
evtTypes [4]journal.EventType
}
type clientApi struct {
@ -50,7 +54,7 @@ type clientApi struct {
full.StateAPI
}
func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, sm *stmgr.StateManager, cs *store.ChainStore, fm *market.FundMgr) storagemarket.StorageClientNode {
func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, sm *stmgr.StateManager, cs *store.ChainStore, fm *market.FundMgr, jrnl journal.Journal) storagemarket.StorageClientNode {
return &ClientNodeAdapter{
StateAPI: state,
ChainAPI: chain,
@ -60,6 +64,13 @@ func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.M
cs: cs,
fm: fm,
ev: events.NewEvents(context.TODO(), &clientApi{chain, state}),
jrnl: jrnl,
evtTypes: [...]journal.EventType{
evtTypeDealSectorCommitted: jrnl.RegisterEventType("markets:storage:client", "deal_sector_committed"),
evtTypeDealExpired: jrnl.RegisterEventType("markets:storage:client", "deal_expired"),
evtTypeDealSlashed: jrnl.RegisterEventType("markets:storage:client", "deal_slashed"),
},
}
}
@ -235,7 +246,14 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
return 0, err
}
return res.IDs[dealIdx], nil
dealID := res.IDs[dealIdx]
journal.MaybeAddEntry(c.jrnl, c.evtTypes[evtTypeDealAccepted], func() interface{} {
deal := deal // copy and strip fields we don't want to log to the journal
deal.ClientSignature = crypto.Signature{}
return ClientDealAcceptedEvt{ID: dealID, Deal: deal, Height: c.cs.GetHeaviestTipSet().Height()}
})
return dealID, nil
}
func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId abi.DealID, cb storagemarket.DealSectorCommittedCallback) error {
@ -278,6 +296,10 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
log.Infof("Storage deal %d activated at epoch %d", dealId, sd.State.SectorStartEpoch)
journal.MaybeAddEntry(c.jrnl, c.evtTypes[evtTypeDealSectorCommitted], func() interface{} {
return ClientDealSectorCommittedEvt{ID: dealId, State: sd.State, Height: curH}
})
cb(nil)
return false, nil

View File

@ -0,0 +1,64 @@
package storageadapter
import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
)
// Journal entry types emitted from this module.
const (
evtTypeDealAccepted = iota
evtTypeDealSectorCommitted
evtTypeDealExpired
evtTypeDealSlashed
)
type ClientDealAcceptedEvt struct {
ID abi.DealID
Deal storagemarket.ClientDeal
Height abi.ChainEpoch
}
type ClientDealSectorCommittedEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type ClientDealExpiredEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type ClientDealSlashedEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type MinerDealAcceptedEvt struct {
ID abi.DealID
Deal storagemarket.MinerDeal
State market.DealState
Height abi.ChainEpoch
}
type MinerDealSectorCommittedEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type MinerDealExpiredEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type MinerDealSlashedEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}

View File

@ -28,6 +28,7 @@ import (
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/modules/dtypes"
@ -45,14 +46,25 @@ type ProviderNodeAdapter struct {
secb *sectorblocks.SectorBlocks
ev *events.Events
jrnl journal.Journal
evtTypes [4]journal.EventType
}
func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, jrnl journal.Journal) storagemarket.StorageProviderNode {
return &ProviderNodeAdapter{
FullNode: full,
dag: dag,
secb: secb,
ev: events.NewEvents(context.TODO(), full),
jrnl: jrnl,
evtTypes: [...]journal.EventType{
evtTypeDealAccepted: jrnl.RegisterEventType("markets:storage:provider", "deal_complete"),
evtTypeDealSectorCommitted: jrnl.RegisterEventType("markets:storage:provider", "deal_sector_committed"),
evtTypeDealExpired: jrnl.RegisterEventType("markets:storage:provider", "deal_expired"),
evtTypeDealSlashed: jrnl.RegisterEventType("markets:storage:provider", "deal_slashed"),
},
}
}
@ -100,8 +112,15 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
if err != nil {
return xerrors.Errorf("AddPiece failed: %s", err)
}
log.Warnf("New Deal: deal %d", deal.DealID)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealAccepted], func() interface{} {
deal := deal // copy and strip fields we don't want to log to the journal
deal.ClientSignature = crypto.Signature{}
return MinerDealAcceptedEvt{ID: deal.DealID, Deal: deal}
})
return nil
}
@ -268,6 +287,10 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide
log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSectorCommitted], func() interface{} {
return MinerDealSectorCommittedEvt{ID: dealID, State: sd.State, Height: curH}
})
cb(nil)
return false, nil
@ -369,15 +392,23 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
height := ts.Height()
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts.Height() {
if sd.Proposal.EndEpoch <= height {
onDealExpired(nil)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} {
return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height}
})
return true, false, nil
}
// If there is no deal assume it's already been slashed
if sd.State.SectorStartEpoch < 0 {
onDealSlashed(ts.Height(), nil)
onDealSlashed(height, nil)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} {
return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height}
})
return true, false, nil
}
@ -389,9 +420,14 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// Called when there was a match against the state change we're looking for
// and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
height := ts2.Height()
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts2.Height() {
if sd.Proposal.EndEpoch <= height {
onDealExpired(nil)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} {
return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height}
})
return false, nil
}
@ -414,7 +450,10 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// Deal was slashed
if deal.To == nil {
onDealSlashed(ts2.Height(), nil)
onDealSlashed(height, nil)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} {
return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height}
})
return false, nil
}