From 09e9d6d7781a221b842d34fd43f49e172f387419 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 14 Sep 2020 16:20:01 +0100 Subject: [PATCH] deal journal events: wire into markets subscriptions. --- markets/loggers/journal.go | 76 ++++++++++++++++++++++++ markets/storageadapter/client.go | 23 +------ markets/storageadapter/journal_events.go | 64 -------------------- markets/storageadapter/provider.go | 45 ++------------ node/modules/client.go | 9 +++ node/modules/storageminer.go | 9 +++ 6 files changed, 99 insertions(+), 127 deletions(-) create mode 100644 markets/loggers/journal.go delete mode 100644 markets/storageadapter/journal_events.go diff --git a/markets/loggers/journal.go b/markets/loggers/journal.go new file mode 100644 index 000000000..12948fe7b --- /dev/null +++ b/markets/loggers/journal.go @@ -0,0 +1,76 @@ +package marketevents + +import ( + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-fil-markets/storagemarket" + + "github.com/filecoin-project/lotus/journal" +) + +type StorageClientEvt struct { + Event string + Deal storagemarket.ClientDeal +} + +type StorageProviderEvt struct { + Event string + Deal storagemarket.MinerDeal +} + +type RetrievalClientEvt struct { + Event string + Deal retrievalmarket.ClientDealState +} + +type RetrievalProviderEvt struct { + Event string + Deal retrievalmarket.ProviderDealState +} + +// StorageClientJournaler records journal events from the storage client. +func StorageClientJournaler(evtType journal.EventType) func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { + return func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { + journal.J.RecordEvent(evtType, func() interface{} { + return StorageClientEvt{ + Event: storagemarket.ClientEvents[event], + Deal: deal, + } + }) + } +} + +// StorageProviderJournaler records journal events from the storage provider. +func StorageProviderJournaler(evtType journal.EventType) func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { + return func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { + journal.J.RecordEvent(evtType, func() interface{} { + return StorageProviderEvt{ + Event: storagemarket.ProviderEvents[event], + Deal: deal, + } + }) + } +} + +// RetrievalClientJournaler records journal events from the retrieval client. +func RetrievalClientJournaler(evtType journal.EventType) func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) { + return func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) { + journal.J.RecordEvent(evtType, func() interface{} { + return RetrievalClientEvt{ + Event: retrievalmarket.ClientEvents[event], + Deal: deal, + } + }) + } +} + +// RetrievalProviderJournaler records journal events from the retrieval provider. +func RetrievalProviderJournaler(evtType journal.EventType) func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) { + return func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) { + journal.J.RecordEvent(evtType, func() interface{} { + return RetrievalProviderEvt{ + Event: retrievalmarket.ProviderEvents[event], + Deal: deal, + } + }) + } +} diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index f66fd3ef9..4168792da 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -24,7 +24,6 @@ import ( "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/impl/full" @@ -43,9 +42,6 @@ type ClientNodeAdapter struct { cs *store.ChainStore fm *market.FundMgr ev *events.Events - - // index 0 is unused, as it corresponds to evtTypeDealAccepted, a provider-only event. - evtTypes [4]journal.EventType } type clientApi struct { @@ -63,12 +59,6 @@ func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.M cs: cs, fm: fm, ev: events.NewEvents(context.TODO(), &clientApi{chain, state}), - - evtTypes: [...]journal.EventType{ - evtTypeDealSectorCommitted: journal.J.RegisterEventType("markets:storage:client", "deal_sector_committed"), - evtTypeDealExpired: journal.J.RegisterEventType("markets:storage:client", "deal_expired"), - evtTypeDealSlashed: journal.J.RegisterEventType("markets:storage:client", "deal_slashed"), - }, } } @@ -233,14 +223,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor return 0, err } - dealID := res.IDs[dealIdx] - journal.J.RecordEvent(c.evtTypes[evtTypeDealAccepted], func() interface{} { - deal := deal // copy and strip fields we don't want to log to the journal - deal.ClientSignature = crypto.Signature{} - return ClientDealAcceptedEvt{ID: dealID, Deal: deal, Height: c.cs.GetHeaviestTipSet().Height()} - }) - - return dealID, nil + return res.IDs[dealIdx], nil } const clientOverestimation = 2 @@ -294,10 +277,6 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider log.Infof("Storage deal %d activated at epoch %d", dealId, sd.State.SectorStartEpoch) - journal.J.RecordEvent(c.evtTypes[evtTypeDealSectorCommitted], func() interface{} { - return ClientDealSectorCommittedEvt{ID: dealId, State: sd.State, Height: curH} - }) - cb(nil) return false, nil diff --git a/markets/storageadapter/journal_events.go b/markets/storageadapter/journal_events.go deleted file mode 100644 index b74c50301..000000000 --- a/markets/storageadapter/journal_events.go +++ /dev/null @@ -1,64 +0,0 @@ -package storageadapter - -import ( - "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/specs-actors/actors/builtin/market" -) - -// Journal entry types emitted from this module. -const ( - evtTypeDealAccepted = iota - evtTypeDealSectorCommitted - evtTypeDealExpired - evtTypeDealSlashed -) - -type ClientDealAcceptedEvt struct { - ID abi.DealID - Deal storagemarket.ClientDeal - Height abi.ChainEpoch -} - -type ClientDealSectorCommittedEvt struct { - ID abi.DealID - State market.DealState - Height abi.ChainEpoch -} - -type ClientDealExpiredEvt struct { - ID abi.DealID - State market.DealState - Height abi.ChainEpoch -} - -type ClientDealSlashedEvt struct { - ID abi.DealID - State market.DealState - Height abi.ChainEpoch -} - -type MinerDealAcceptedEvt struct { - ID abi.DealID - Deal storagemarket.MinerDeal - State market.DealState - Height abi.ChainEpoch -} - -type MinerDealSectorCommittedEvt struct { - ID abi.DealID - State market.DealState - Height abi.ChainEpoch -} - -type MinerDealExpiredEvt struct { - ID abi.DealID - State market.DealState - Height abi.ChainEpoch -} - -type MinerDealSlashedEvt struct { - ID abi.DealID - State market.DealState - Height abi.ChainEpoch -} diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 5b8ead5be..7af1808c1 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -30,7 +30,6 @@ import ( "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" - "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -49,8 +48,6 @@ type ProviderNodeAdapter struct { secb *sectorblocks.SectorBlocks ev *events.Events - - evtTypes [4]journal.EventType } func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { @@ -59,13 +56,6 @@ func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBloc dag: dag, secb: secb, ev: events.NewEvents(context.TODO(), full), - - evtTypes: [...]journal.EventType{ - evtTypeDealAccepted: journal.J.RegisterEventType("markets:storage:provider", "deal_complete"), - evtTypeDealSectorCommitted: journal.J.RegisterEventType("markets:storage:provider", "deal_sector_committed"), - evtTypeDealExpired: journal.J.RegisterEventType("markets:storage:provider", "deal_expired"), - evtTypeDealSlashed: journal.J.RegisterEventType("markets:storage:provider", "deal_slashed"), - }, } } @@ -132,15 +122,8 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema if err != nil { return nil, xerrors.Errorf("AddPiece failed: %s", err) } - log.Warnf("New Deal: deal %d", deal.DealID) - journal.J.RecordEvent(n.evtTypes[evtTypeDealAccepted], func() interface{} { - deal := deal // copy and strip fields we don't want to log to the journal - deal.ClientSignature = crypto.Signature{} - return MinerDealAcceptedEvt{ID: deal.DealID, Deal: deal} - }) - return &storagemarket.PackingResult{ SectorNumber: p, Offset: offset, @@ -319,10 +302,6 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch) - journal.J.RecordEvent(n.evtTypes[evtTypeDealSectorCommitted], func() interface{} { - return MinerDealSectorCommittedEvt{ID: dealID, State: sd.State, Height: curH} - }) - cb(nil) return false, nil @@ -429,23 +408,15 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID return false, true, nil } - height := ts.Height() - // Check if the deal has already expired - if sd.Proposal.EndEpoch <= height { + if sd.Proposal.EndEpoch <= ts.Height() { onDealExpired(nil) - journal.J.RecordEvent(n.evtTypes[evtTypeDealExpired], func() interface{} { - return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height} - }) return true, false, nil } // If there is no deal assume it's already been slashed if sd.State.SectorStartEpoch < 0 { - onDealSlashed(height, nil) - journal.J.RecordEvent(n.evtTypes[evtTypeDealSlashed], func() interface{} { - return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height} - }) + onDealSlashed(ts.Height(), nil) return true, false, nil } @@ -457,14 +428,9 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Called when there was a match against the state change we're looking for // and the chain has advanced to the confidence height stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { - height := ts2.Height() - // Check if the deal has already expired - if sd.Proposal.EndEpoch <= height { + if sd.Proposal.EndEpoch <= ts2.Height() { onDealExpired(nil) - journal.J.RecordEvent(n.evtTypes[evtTypeDealExpired], func() interface{} { - return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height} - }) return false, nil } @@ -487,10 +453,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Deal was slashed if deal.To == nil { - onDealSlashed(height, nil) - journal.J.RecordEvent(n.evtTypes[evtTypeDealSlashed], func() interface{} { - return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height} - }) + onDealSlashed(ts2.Height(), nil) return false, nil } diff --git a/node/modules/client.go b/node/modules/client.go index 63633c0e3..7d1ac942e 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -26,6 +26,7 @@ import ( "github.com/ipfs/go-datastore/namespace" "github.com/libp2p/go-libp2p-core/host" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/blockstore" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/markets/retrievaladapter" @@ -119,6 +120,10 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { c.SubscribeToEvents(marketevents.StorageClientLogger) + + evtType := journal.J.RegisterEventType("markets/storage/client", "state_change") + c.SubscribeToEvents(marketevents.StorageClientJournaler(evtType)) + return c.Start(ctx) }, OnStop: func(context.Context) error { @@ -140,6 +145,10 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { client.SubscribeToEvents(marketevents.RetrievalClientLogger) + + evtType := journal.J.RegisterEventType("markets/retrieval/client", "state_change") + client.SubscribeToEvents(marketevents.RetrievalClientJournaler(evtType)) + return nil }, }) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 5f00fffc0..eb512da07 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -49,6 +49,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/stores" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + "github.com/filecoin-project/lotus/journal" lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" @@ -213,6 +214,10 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva lc.Append(fx.Hook{ OnStart: func(context.Context) error { m.SubscribeToEvents(marketevents.RetrievalProviderLogger) + + evtType := journal.J.RegisterEventType("markets/retrieval/provider", "state_change") + m.SubscribeToEvents(marketevents.RetrievalProviderJournaler(evtType)) + return m.Start() }, OnStop: func(context.Context) error { @@ -227,6 +232,10 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto lc.Append(fx.Hook{ OnStart: func(context.Context) error { h.SubscribeToEvents(marketevents.StorageProviderLogger) + + evtType := journal.J.RegisterEventType("markets/storage/provider", "state_change") + h.SubscribeToEvents(marketevents.StorageProviderJournaler(evtType)) + return h.Start(ctx) }, OnStop: func(context.Context) error {