From d547c2588c0b84f0660f1332aa9e0ae4f0509483 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 21 Jul 2020 17:26:44 +0100 Subject: [PATCH] record deals events in journal. --- markets/storageadapter/client.go | 26 +++++++++- markets/storageadapter/journal_events.go | 64 ++++++++++++++++++++++++ markets/storageadapter/provider.go | 49 ++++++++++++++++-- 3 files changed, 132 insertions(+), 7 deletions(-) create mode 100644 markets/storageadapter/journal_events.go diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index 2bb762e28..1da85fa69 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -7,6 +7,7 @@ import ( "context" "github.com/filecoin-project/lotus/chain/events/state" + "github.com/filecoin-project/lotus/journal" "golang.org/x/xerrors" @@ -43,6 +44,9 @@ type ClientNodeAdapter struct { cs *store.ChainStore fm *market.FundMgr ev *events.Events + + jrnl journal.Journal + evtTypes [4]journal.EventType } type clientApi struct { @@ -50,7 +54,7 @@ type clientApi struct { full.StateAPI } -func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, sm *stmgr.StateManager, cs *store.ChainStore, fm *market.FundMgr) storagemarket.StorageClientNode { +func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, sm *stmgr.StateManager, cs *store.ChainStore, fm *market.FundMgr, jrnl journal.Journal) storagemarket.StorageClientNode { return &ClientNodeAdapter{ StateAPI: state, ChainAPI: chain, @@ -60,6 +64,13 @@ func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.M cs: cs, fm: fm, ev: events.NewEvents(context.TODO(), &clientApi{chain, state}), + + jrnl: jrnl, + evtTypes: [...]journal.EventType{ + evtTypeDealSectorCommitted: jrnl.RegisterEventType("markets:storage:client", "deal_sector_committed"), + evtTypeDealExpired: jrnl.RegisterEventType("markets:storage:client", "deal_expired"), + evtTypeDealSlashed: jrnl.RegisterEventType("markets:storage:client", "deal_slashed"), + }, } } @@ -235,7 +246,14 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor return 0, err } - return res.IDs[dealIdx], nil + dealID := res.IDs[dealIdx] + journal.MaybeAddEntry(c.jrnl, c.evtTypes[evtTypeDealAccepted], func() interface{} { + deal := deal // copy and strip fields we don't want to log to the journal + deal.ClientSignature = crypto.Signature{} + return ClientDealAcceptedEvt{ID: dealID, Deal: deal, Height: c.cs.GetHeaviestTipSet().Height()} + }) + + return dealID, nil } func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId abi.DealID, cb storagemarket.DealSectorCommittedCallback) error { @@ -278,6 +296,10 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider log.Infof("Storage deal %d activated at epoch %d", dealId, sd.State.SectorStartEpoch) + journal.MaybeAddEntry(c.jrnl, c.evtTypes[evtTypeDealSectorCommitted], func() interface{} { + return ClientDealSectorCommittedEvt{ID: dealId, State: sd.State, Height: curH} + }) + cb(nil) return false, nil diff --git a/markets/storageadapter/journal_events.go b/markets/storageadapter/journal_events.go new file mode 100644 index 000000000..6192738df --- /dev/null +++ b/markets/storageadapter/journal_events.go @@ -0,0 +1,64 @@ +package storageadapter + +import ( + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/market" +) + +// Journal entry types emitted from this module. +const ( + evtTypeDealAccepted = iota + evtTypeDealSectorCommitted + evtTypeDealExpired + evtTypeDealSlashed +) + +type ClientDealAcceptedEvt struct { + ID abi.DealID + Deal storagemarket.ClientDeal + Height abi.ChainEpoch +} + +type ClientDealSectorCommittedEvt struct { + ID abi.DealID + State market.DealState + Height abi.ChainEpoch +} + +type ClientDealExpiredEvt struct { + ID abi.DealID + State market.DealState + Height abi.ChainEpoch +} + +type ClientDealSlashedEvt struct { + ID abi.DealID + State market.DealState + Height abi.ChainEpoch +} + +type MinerDealAcceptedEvt struct { + ID abi.DealID + Deal storagemarket.MinerDeal + State market.DealState + Height abi.ChainEpoch +} + +type MinerDealSectorCommittedEvt struct { + ID abi.DealID + State market.DealState + Height abi.ChainEpoch +} + +type MinerDealExpiredEvt struct { + ID abi.DealID + State market.DealState + Height abi.ChainEpoch +} + +type MinerDealSlashedEvt struct { + ID abi.DealID + State market.DealState + Height abi.ChainEpoch +} diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 9e1a101cc..2aa9c45df 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -28,6 +28,7 @@ import ( "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -45,14 +46,25 @@ type ProviderNodeAdapter struct { secb *sectorblocks.SectorBlocks ev *events.Events + + jrnl journal.Journal + evtTypes [4]journal.EventType } -func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { +func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, jrnl journal.Journal) storagemarket.StorageProviderNode { return &ProviderNodeAdapter{ FullNode: full, dag: dag, secb: secb, ev: events.NewEvents(context.TODO(), full), + + jrnl: jrnl, + evtTypes: [...]journal.EventType{ + evtTypeDealAccepted: jrnl.RegisterEventType("markets:storage:provider", "deal_complete"), + evtTypeDealSectorCommitted: jrnl.RegisterEventType("markets:storage:provider", "deal_sector_committed"), + evtTypeDealExpired: jrnl.RegisterEventType("markets:storage:provider", "deal_expired"), + evtTypeDealSlashed: jrnl.RegisterEventType("markets:storage:provider", "deal_slashed"), + }, } } @@ -100,8 +112,15 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema if err != nil { return xerrors.Errorf("AddPiece failed: %s", err) } + log.Warnf("New Deal: deal %d", deal.DealID) + journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealAccepted], func() interface{} { + deal := deal // copy and strip fields we don't want to log to the journal + deal.ClientSignature = crypto.Signature{} + return MinerDealAcceptedEvt{ID: deal.DealID, Deal: deal} + }) + return nil } @@ -268,6 +287,10 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch) + journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSectorCommitted], func() interface{} { + return MinerDealSectorCommittedEvt{ID: dealID, State: sd.State, Height: curH} + }) + cb(nil) return false, nil @@ -369,15 +392,23 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Called immediately to check if the deal has already expired or been slashed checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { + height := ts.Height() + // Check if the deal has already expired - if sd.Proposal.EndEpoch <= ts.Height() { + if sd.Proposal.EndEpoch <= height { onDealExpired(nil) + journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} { + return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height} + }) return true, false, nil } // If there is no deal assume it's already been slashed if sd.State.SectorStartEpoch < 0 { - onDealSlashed(ts.Height(), nil) + onDealSlashed(height, nil) + journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} { + return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height} + }) return true, false, nil } @@ -389,9 +420,14 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Called when there was a match against the state change we're looking for // and the chain has advanced to the confidence height stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { + height := ts2.Height() + // Check if the deal has already expired - if sd.Proposal.EndEpoch <= ts2.Height() { + if sd.Proposal.EndEpoch <= height { onDealExpired(nil) + journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} { + return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height} + }) return false, nil } @@ -414,7 +450,10 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Deal was slashed if deal.To == nil { - onDealSlashed(ts2.Height(), nil) + onDealSlashed(height, nil) + journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} { + return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height} + }) return false, nil }