deal journal events: wire into markets subscriptions.

This commit is contained in:
Raúl Kripalani 2020-09-14 16:20:01 +01:00
parent 05aa5f2d38
commit 09e9d6d778
6 changed files with 99 additions and 127 deletions

View File

@ -0,0 +1,76 @@
package marketevents
import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/journal"
)
type StorageClientEvt struct {
Event string
Deal storagemarket.ClientDeal
}
type StorageProviderEvt struct {
Event string
Deal storagemarket.MinerDeal
}
type RetrievalClientEvt struct {
Event string
Deal retrievalmarket.ClientDealState
}
type RetrievalProviderEvt struct {
Event string
Deal retrievalmarket.ProviderDealState
}
// StorageClientJournaler records journal events from the storage client.
func StorageClientJournaler(evtType journal.EventType) func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
return func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
journal.J.RecordEvent(evtType, func() interface{} {
return StorageClientEvt{
Event: storagemarket.ClientEvents[event],
Deal: deal,
}
})
}
}
// StorageProviderJournaler records journal events from the storage provider.
func StorageProviderJournaler(evtType journal.EventType) func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
return func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
journal.J.RecordEvent(evtType, func() interface{} {
return StorageProviderEvt{
Event: storagemarket.ProviderEvents[event],
Deal: deal,
}
})
}
}
// RetrievalClientJournaler records journal events from the retrieval client.
func RetrievalClientJournaler(evtType journal.EventType) func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
return func(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
journal.J.RecordEvent(evtType, func() interface{} {
return RetrievalClientEvt{
Event: retrievalmarket.ClientEvents[event],
Deal: deal,
}
})
}
}
// RetrievalProviderJournaler records journal events from the retrieval provider.
func RetrievalProviderJournaler(evtType journal.EventType) func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
return func(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
journal.J.RecordEvent(evtType, func() interface{} {
return RetrievalProviderEvt{
Event: retrievalmarket.ProviderEvents[event],
Deal: deal,
}
})
}
}

View File

@ -24,7 +24,6 @@ import (
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/full"
@ -43,9 +42,6 @@ type ClientNodeAdapter struct {
cs *store.ChainStore cs *store.ChainStore
fm *market.FundMgr fm *market.FundMgr
ev *events.Events ev *events.Events
// index 0 is unused, as it corresponds to evtTypeDealAccepted, a provider-only event.
evtTypes [4]journal.EventType
} }
type clientApi struct { type clientApi struct {
@ -63,12 +59,6 @@ func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.M
cs: cs, cs: cs,
fm: fm, fm: fm,
ev: events.NewEvents(context.TODO(), &clientApi{chain, state}), ev: events.NewEvents(context.TODO(), &clientApi{chain, state}),
evtTypes: [...]journal.EventType{
evtTypeDealSectorCommitted: journal.J.RegisterEventType("markets:storage:client", "deal_sector_committed"),
evtTypeDealExpired: journal.J.RegisterEventType("markets:storage:client", "deal_expired"),
evtTypeDealSlashed: journal.J.RegisterEventType("markets:storage:client", "deal_slashed"),
},
} }
} }
@ -233,14 +223,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
return 0, err return 0, err
} }
dealID := res.IDs[dealIdx] return res.IDs[dealIdx], nil
journal.J.RecordEvent(c.evtTypes[evtTypeDealAccepted], func() interface{} {
deal := deal // copy and strip fields we don't want to log to the journal
deal.ClientSignature = crypto.Signature{}
return ClientDealAcceptedEvt{ID: dealID, Deal: deal, Height: c.cs.GetHeaviestTipSet().Height()}
})
return dealID, nil
} }
const clientOverestimation = 2 const clientOverestimation = 2
@ -294,10 +277,6 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
log.Infof("Storage deal %d activated at epoch %d", dealId, sd.State.SectorStartEpoch) log.Infof("Storage deal %d activated at epoch %d", dealId, sd.State.SectorStartEpoch)
journal.J.RecordEvent(c.evtTypes[evtTypeDealSectorCommitted], func() interface{} {
return ClientDealSectorCommittedEvt{ID: dealId, State: sd.State, Height: curH}
})
cb(nil) cb(nil)
return false, nil return false, nil

View File

@ -1,64 +0,0 @@
package storageadapter
import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
)
// Journal entry types emitted from this module.
const (
evtTypeDealAccepted = iota
evtTypeDealSectorCommitted
evtTypeDealExpired
evtTypeDealSlashed
)
type ClientDealAcceptedEvt struct {
ID abi.DealID
Deal storagemarket.ClientDeal
Height abi.ChainEpoch
}
type ClientDealSectorCommittedEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type ClientDealExpiredEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type ClientDealSlashedEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type MinerDealAcceptedEvt struct {
ID abi.DealID
Deal storagemarket.MinerDeal
State market.DealState
Height abi.ChainEpoch
}
type MinerDealSectorCommittedEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type MinerDealExpiredEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}
type MinerDealSlashedEvt struct {
ID abi.DealID
State market.DealState
Height abi.ChainEpoch
}

View File

@ -30,7 +30,6 @@ import (
"github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
@ -49,8 +48,6 @@ type ProviderNodeAdapter struct {
secb *sectorblocks.SectorBlocks secb *sectorblocks.SectorBlocks
ev *events.Events ev *events.Events
evtTypes [4]journal.EventType
} }
func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
@ -59,13 +56,6 @@ func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBloc
dag: dag, dag: dag,
secb: secb, secb: secb,
ev: events.NewEvents(context.TODO(), full), ev: events.NewEvents(context.TODO(), full),
evtTypes: [...]journal.EventType{
evtTypeDealAccepted: journal.J.RegisterEventType("markets:storage:provider", "deal_complete"),
evtTypeDealSectorCommitted: journal.J.RegisterEventType("markets:storage:provider", "deal_sector_committed"),
evtTypeDealExpired: journal.J.RegisterEventType("markets:storage:provider", "deal_expired"),
evtTypeDealSlashed: journal.J.RegisterEventType("markets:storage:provider", "deal_slashed"),
},
} }
} }
@ -132,15 +122,8 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
if err != nil { if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err) return nil, xerrors.Errorf("AddPiece failed: %s", err)
} }
log.Warnf("New Deal: deal %d", deal.DealID) log.Warnf("New Deal: deal %d", deal.DealID)
journal.J.RecordEvent(n.evtTypes[evtTypeDealAccepted], func() interface{} {
deal := deal // copy and strip fields we don't want to log to the journal
deal.ClientSignature = crypto.Signature{}
return MinerDealAcceptedEvt{ID: deal.DealID, Deal: deal}
})
return &storagemarket.PackingResult{ return &storagemarket.PackingResult{
SectorNumber: p, SectorNumber: p,
Offset: offset, Offset: offset,
@ -319,10 +302,6 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide
log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch) log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch)
journal.J.RecordEvent(n.evtTypes[evtTypeDealSectorCommitted], func() interface{} {
return MinerDealSectorCommittedEvt{ID: dealID, State: sd.State, Height: curH}
})
cb(nil) cb(nil)
return false, nil return false, nil
@ -429,23 +408,15 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
return false, true, nil return false, true, nil
} }
height := ts.Height()
// Check if the deal has already expired // Check if the deal has already expired
if sd.Proposal.EndEpoch <= height { if sd.Proposal.EndEpoch <= ts.Height() {
onDealExpired(nil) onDealExpired(nil)
journal.J.RecordEvent(n.evtTypes[evtTypeDealExpired], func() interface{} {
return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height}
})
return true, false, nil return true, false, nil
} }
// If there is no deal assume it's already been slashed // If there is no deal assume it's already been slashed
if sd.State.SectorStartEpoch < 0 { if sd.State.SectorStartEpoch < 0 {
onDealSlashed(height, nil) onDealSlashed(ts.Height(), nil)
journal.J.RecordEvent(n.evtTypes[evtTypeDealSlashed], func() interface{} {
return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height}
})
return true, false, nil return true, false, nil
} }
@ -457,14 +428,9 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// Called when there was a match against the state change we're looking for // Called when there was a match against the state change we're looking for
// and the chain has advanced to the confidence height // and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
height := ts2.Height()
// Check if the deal has already expired // Check if the deal has already expired
if sd.Proposal.EndEpoch <= height { if sd.Proposal.EndEpoch <= ts2.Height() {
onDealExpired(nil) onDealExpired(nil)
journal.J.RecordEvent(n.evtTypes[evtTypeDealExpired], func() interface{} {
return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height}
})
return false, nil return false, nil
} }
@ -487,10 +453,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// Deal was slashed // Deal was slashed
if deal.To == nil { if deal.To == nil {
onDealSlashed(height, nil) onDealSlashed(ts2.Height(), nil)
journal.J.RecordEvent(n.evtTypes[evtTypeDealSlashed], func() interface{} {
return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height}
})
return false, nil return false, nil
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/lib/blockstore"
marketevents "github.com/filecoin-project/lotus/markets/loggers" marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/markets/retrievaladapter"
@ -119,6 +120,10 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error { OnStart: func(ctx context.Context) error {
c.SubscribeToEvents(marketevents.StorageClientLogger) c.SubscribeToEvents(marketevents.StorageClientLogger)
evtType := journal.J.RegisterEventType("markets/storage/client", "state_change")
c.SubscribeToEvents(marketevents.StorageClientJournaler(evtType))
return c.Start(ctx) return c.Start(ctx)
}, },
OnStop: func(context.Context) error { OnStop: func(context.Context) error {
@ -140,6 +145,10 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore,
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error { OnStart: func(ctx context.Context) error {
client.SubscribeToEvents(marketevents.RetrievalClientLogger) client.SubscribeToEvents(marketevents.RetrievalClientLogger)
evtType := journal.J.RegisterEventType("markets/retrieval/client", "state_change")
client.SubscribeToEvents(marketevents.RetrievalClientJournaler(evtType))
return nil return nil
}, },
}) })

View File

@ -49,6 +49,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/journal"
lapi "github.com/filecoin-project/lotus/api" lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
@ -213,6 +214,10 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(context.Context) error { OnStart: func(context.Context) error {
m.SubscribeToEvents(marketevents.RetrievalProviderLogger) m.SubscribeToEvents(marketevents.RetrievalProviderLogger)
evtType := journal.J.RegisterEventType("markets/retrieval/provider", "state_change")
m.SubscribeToEvents(marketevents.RetrievalProviderJournaler(evtType))
return m.Start() return m.Start()
}, },
OnStop: func(context.Context) error { OnStop: func(context.Context) error {
@ -227,6 +232,10 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(context.Context) error { OnStart: func(context.Context) error {
h.SubscribeToEvents(marketevents.StorageProviderLogger) h.SubscribeToEvents(marketevents.StorageProviderLogger)
evtType := journal.J.RegisterEventType("markets/storage/provider", "state_change")
h.SubscribeToEvents(marketevents.StorageProviderJournaler(evtType))
return h.Start(ctx) return h.Start(ctx)
}, },
OnStop: func(context.Context) error { OnStop: func(context.Context) error {