Merge pull request #5309 from filecoin-project/feat/deal-batch-publish
batch publish deal messages
This commit is contained in:
commit
ba571794dc
@ -444,10 +444,10 @@ type GatewayStruct struct {
|
||||
StateMinerProvingDeadline func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error)
|
||||
StateMinerPower func(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error)
|
||||
StateMarketBalance func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MarketBalance, error)
|
||||
StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
|
||||
StateMarketStorageDeal func(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error)
|
||||
StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error)
|
||||
StateNetworkVersion func(ctx context.Context, tsk types.TipSetKey) (stnetwork.Version, error)
|
||||
StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
|
||||
StateSectorGetInfo func(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
|
||||
StateVerifiedClientStatus func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
|
||||
StateWaitMsg func(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
|
||||
|
@ -20,9 +20,13 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/markets/storageadapter"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
dag "github.com/ipfs/go-merkledag"
|
||||
dstest "github.com/ipfs/go-merkledag/test"
|
||||
@ -88,6 +92,97 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api
|
||||
return res, data, nil
|
||||
}
|
||||
|
||||
func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
|
||||
publishPeriod := 10 * time.Second
|
||||
maxDealsPerMsg := uint64(2)
|
||||
|
||||
// Set max deals per publish deals message to 2
|
||||
minerDef := []StorageMiner{{
|
||||
Full: 0,
|
||||
Opts: node.Override(
|
||||
new(*storageadapter.DealPublisher),
|
||||
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
|
||||
Period: publishPeriod,
|
||||
MaxDealsPerMsg: maxDealsPerMsg,
|
||||
})),
|
||||
Preseal: PresealGenesis,
|
||||
}}
|
||||
|
||||
// Create a connect client and miner node
|
||||
n, sn := b(t, OneFull, minerDef)
|
||||
client := n[0].FullNode.(*impl.FullNodeAPI)
|
||||
miner := sn[0]
|
||||
s := connectAndStartMining(t, b, blocktime, client, miner)
|
||||
defer s.blockMiner.Stop()
|
||||
|
||||
// Starts a deal and waits until it's published
|
||||
runDealTillPublish := func(rseed int) {
|
||||
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
|
||||
require.NoError(t, err)
|
||||
|
||||
upds, err := client.ClientGetDealUpdates(s.ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
|
||||
|
||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||
time.Sleep(time.Second)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for upd := range upds {
|
||||
if upd.DataRef.Root == res.Root && upd.State == storagemarket.StorageDealAwaitingPreCommit {
|
||||
done <- struct{}{}
|
||||
}
|
||||
}
|
||||
}()
|
||||
<-done
|
||||
}
|
||||
|
||||
// Run three deals in parallel
|
||||
done := make(chan struct{}, maxDealsPerMsg+1)
|
||||
for rseed := 1; rseed <= 3; rseed++ {
|
||||
rseed := rseed
|
||||
go func() {
|
||||
runDealTillPublish(rseed)
|
||||
done <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for two of the deals to be published
|
||||
for i := 0; i < int(maxDealsPerMsg); i++ {
|
||||
<-done
|
||||
}
|
||||
|
||||
// Expect a single PublishStorageDeals message that includes the first two deals
|
||||
msgCids, err := s.client.StateListMessages(s.ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1)
|
||||
require.NoError(t, err)
|
||||
count := 0
|
||||
for _, msgCid := range msgCids {
|
||||
msg, err := s.client.ChainGetMessage(s.ctx, msgCid)
|
||||
require.NoError(t, err)
|
||||
|
||||
if msg.Method == market.Methods.PublishStorageDeals {
|
||||
count++
|
||||
var pubDealsParams market2.PublishStorageDealsParams
|
||||
err = pubDealsParams.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, pubDealsParams.Deals, int(maxDealsPerMsg))
|
||||
}
|
||||
}
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
// The third deal should be published once the publish period expires.
|
||||
// Allow a little padding as it takes a moment for the state change to
|
||||
// be noticed by the client.
|
||||
padding := 10 * time.Second
|
||||
select {
|
||||
case <-time.After(publishPeriod + padding):
|
||||
require.Fail(t, "Expected 3rd deal to be published once publish period elapsed")
|
||||
case <-done: // Success
|
||||
}
|
||||
}
|
||||
|
||||
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
|
||||
s := setupOneClientOneMiner(t, b, blocktime)
|
||||
defer s.blockMiner.Stop()
|
||||
|
@ -59,6 +59,7 @@ const GenesisPreseals = 2
|
||||
// Options for setting up a mock storage miner
|
||||
type StorageMiner struct {
|
||||
Full int
|
||||
Opts node.Option
|
||||
Preseal int
|
||||
}
|
||||
|
||||
|
@ -99,11 +99,13 @@ func (e *Events) listenHeadChanges(ctx context.Context) {
|
||||
} else {
|
||||
log.Warn("listenHeadChanges quit")
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
select {
|
||||
case <-build.Clock.After(time.Second):
|
||||
case <-ctx.Done():
|
||||
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
|
||||
return
|
||||
}
|
||||
build.Clock.Sleep(time.Second)
|
||||
|
||||
log.Info("restarting listenHeadChanges")
|
||||
}
|
||||
}
|
||||
|
@ -6,11 +6,12 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||
"github.com/ipfs/go-cid"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||
"github.com/filecoin-project/go-fil-markets/shared"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
@ -31,15 +32,16 @@ import (
|
||||
"github.com/filecoin-project/lotus/lib/sigs"
|
||||
"github.com/filecoin-project/lotus/markets/utils"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
)
|
||||
|
||||
type ClientNodeAdapter struct {
|
||||
*clientApi
|
||||
*apiWrapper
|
||||
|
||||
fundmgr *market.FundManager
|
||||
ev *events.Events
|
||||
dsMatcher *dealStateMatcher
|
||||
scMgr *SectorCommittedManager
|
||||
}
|
||||
|
||||
type clientApi struct {
|
||||
@ -48,16 +50,20 @@ type clientApi struct {
|
||||
full.MpoolAPI
|
||||
}
|
||||
|
||||
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
|
||||
func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
|
||||
capi := &clientApi{chain, stateapi, mpool}
|
||||
return &ClientNodeAdapter{
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
ev := events.NewEvents(ctx, capi)
|
||||
a := &ClientNodeAdapter{
|
||||
clientApi: capi,
|
||||
apiWrapper: &apiWrapper{api: capi},
|
||||
|
||||
fundmgr: fundmgr,
|
||||
ev: events.NewEvents(context.TODO(), capi),
|
||||
ev: ev,
|
||||
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi))),
|
||||
}
|
||||
a.scMgr = NewSectorCommittedManager(ev, a, &apiWrapper{api: capi})
|
||||
return a
|
||||
}
|
||||
|
||||
func (c *ClientNodeAdapter) ListStorageProviders(ctx context.Context, encodedTs shared.TipSetToken) ([]*storagemarket.StorageProviderInfo, error) {
|
||||
@ -135,6 +141,7 @@ func (c *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address
|
||||
|
||||
// ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal
|
||||
// returns the Deal id if there is no error
|
||||
// TODO: Don't return deal ID
|
||||
func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (abi.DealID, error) {
|
||||
log.Infow("DEAL ACCEPTED!")
|
||||
|
||||
@ -216,14 +223,17 @@ func (c *ClientNodeAdapter) DealProviderCollateralBounds(ctx context.Context, si
|
||||
return big.Mul(bounds.Min, big.NewInt(clientOverestimation)), bounds.Max, nil
|
||||
}
|
||||
|
||||
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
|
||||
func (c *ClientNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
|
||||
return OnDealSectorPreCommitted(ctx, c, c.ev, provider, dealID, marketactor.DealProposal(proposal), publishCid, cb)
|
||||
return c.scMgr.OnDealSectorPreCommitted(ctx, provider, marketactor.DealProposal(proposal), *publishCid, cb)
|
||||
}
|
||||
|
||||
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
|
||||
func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
|
||||
return OnDealSectorCommitted(ctx, c, c.ev, provider, dealID, sectorNumber, marketactor.DealProposal(proposal), publishCid, cb)
|
||||
return c.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, marketactor.DealProposal(proposal), *publishCid, cb)
|
||||
}
|
||||
|
||||
// TODO: Replace dealID parameter with DealProposal
|
||||
func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
|
||||
head, err := c.ChainHead(ctx)
|
||||
if err != nil {
|
||||
|
345
markets/storageadapter/dealpublisher.go
Normal file
345
markets/storageadapter/dealpublisher.go
Normal file
@ -0,0 +1,345 @@
|
||||
package storageadapter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type dealPublisherAPI interface {
|
||||
ChainHead(context.Context) (*types.TipSet, error)
|
||||
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
|
||||
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
|
||||
}
|
||||
|
||||
// DealPublisher batches deal publishing so that many deals can be included in
|
||||
// a single publish message. This saves gas for miners that publish deals
|
||||
// frequently.
|
||||
// When a deal is submitted, the DealPublisher waits a configurable amount of
|
||||
// time for other deals to be submitted before sending the publish message.
|
||||
// There is a configurable maximum number of deals that can be included in one
|
||||
// message. When the limit is reached the DealPublisher immediately submits a
|
||||
// publish message with all deals in the queue.
|
||||
type DealPublisher struct {
|
||||
api dealPublisherAPI
|
||||
|
||||
ctx context.Context
|
||||
Shutdown context.CancelFunc
|
||||
|
||||
maxDealsPerPublishMsg uint64
|
||||
publishPeriod time.Duration
|
||||
publishSpec *api.MessageSendSpec
|
||||
|
||||
lk sync.Mutex
|
||||
pending []*pendingDeal
|
||||
cancelWaitForMoreDeals context.CancelFunc
|
||||
publishPeriodStart time.Time
|
||||
}
|
||||
|
||||
// A deal that is queued to be published
|
||||
type pendingDeal struct {
|
||||
ctx context.Context
|
||||
deal market2.ClientDealProposal
|
||||
Result chan publishResult
|
||||
}
|
||||
|
||||
// The result of publishing a deal
|
||||
type publishResult struct {
|
||||
msgCid cid.Cid
|
||||
err error
|
||||
}
|
||||
|
||||
func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal {
|
||||
return &pendingDeal{
|
||||
ctx: ctx,
|
||||
deal: deal,
|
||||
Result: make(chan publishResult),
|
||||
}
|
||||
}
|
||||
|
||||
type PublishMsgConfig struct {
|
||||
// The amount of time to wait for more deals to arrive before
|
||||
// publishing
|
||||
Period time.Duration
|
||||
// The maximum number of deals to include in a single PublishStorageDeals
|
||||
// message
|
||||
MaxDealsPerMsg uint64
|
||||
}
|
||||
|
||||
func NewDealPublisher(
|
||||
feeConfig *config.MinerFeeConfig,
|
||||
publishMsgCfg PublishMsgConfig,
|
||||
) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
|
||||
return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
|
||||
maxFee := abi.NewTokenAmount(0)
|
||||
if feeConfig != nil {
|
||||
maxFee = abi.TokenAmount(feeConfig.MaxPublishDealsFee)
|
||||
}
|
||||
publishSpec := &api.MessageSendSpec{MaxFee: maxFee}
|
||||
dp := newDealPublisher(full, publishMsgCfg, publishSpec)
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
dp.Shutdown()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return dp
|
||||
}
|
||||
}
|
||||
|
||||
func newDealPublisher(
|
||||
dpapi dealPublisherAPI,
|
||||
publishMsgCfg PublishMsgConfig,
|
||||
publishSpec *api.MessageSendSpec,
|
||||
) *DealPublisher {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &DealPublisher{
|
||||
api: dpapi,
|
||||
ctx: ctx,
|
||||
Shutdown: cancel,
|
||||
maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg,
|
||||
publishPeriod: publishMsgCfg.Period,
|
||||
publishSpec: publishSpec,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *DealPublisher) Publish(ctx context.Context, deal market2.ClientDealProposal) (cid.Cid, error) {
|
||||
pdeal := newPendingDeal(ctx, deal)
|
||||
|
||||
// Add the deal to the queue
|
||||
p.processNewDeal(pdeal)
|
||||
|
||||
// Wait for the deal to be submitted
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return cid.Undef, ctx.Err()
|
||||
case res := <-pdeal.Result:
|
||||
return res.msgCid, res.err
|
||||
}
|
||||
}
|
||||
|
||||
func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
|
||||
p.lk.Lock()
|
||||
defer p.lk.Unlock()
|
||||
|
||||
// Filter out any cancelled deals
|
||||
p.filterCancelledDeals()
|
||||
|
||||
// If all deals have been cancelled, clear the wait-for-deals timer
|
||||
if len(p.pending) == 0 && p.cancelWaitForMoreDeals != nil {
|
||||
p.cancelWaitForMoreDeals()
|
||||
p.cancelWaitForMoreDeals = nil
|
||||
}
|
||||
|
||||
// Make sure the new deal hasn't been cancelled
|
||||
if pdeal.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Add the new deal to the queue
|
||||
p.pending = append(p.pending, pdeal)
|
||||
log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",
|
||||
pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg)
|
||||
|
||||
// If the maximum number of deals per message has been reached,
|
||||
// send a publish message
|
||||
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg {
|
||||
log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg)
|
||||
p.publishAllDeals()
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise wait for more deals to arrive or the timeout to be reached
|
||||
p.waitForMoreDeals()
|
||||
}
|
||||
|
||||
func (p *DealPublisher) waitForMoreDeals() {
|
||||
// Check if we're already waiting for deals
|
||||
if !p.publishPeriodStart.IsZero() {
|
||||
elapsed := time.Since(p.publishPeriodStart)
|
||||
log.Infof("%s elapsed of / %s until publish deals queue is published",
|
||||
elapsed, p.publishPeriod)
|
||||
return
|
||||
}
|
||||
|
||||
// Set a timeout to wait for more deals to arrive
|
||||
log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod)
|
||||
ctx, cancel := context.WithCancel(p.ctx)
|
||||
p.publishPeriodStart = time.Now()
|
||||
p.cancelWaitForMoreDeals = cancel
|
||||
|
||||
go func() {
|
||||
timer := time.NewTimer(p.publishPeriod)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
case <-timer.C:
|
||||
p.lk.Lock()
|
||||
defer p.lk.Unlock()
|
||||
|
||||
// The timeout has expired so publish all pending deals
|
||||
log.Infof("publish deals queue period of %s has expired, publishing deals", p.publishPeriod)
|
||||
p.publishAllDeals()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *DealPublisher) publishAllDeals() {
|
||||
// If the timeout hasn't yet been cancelled, cancel it
|
||||
if p.cancelWaitForMoreDeals != nil {
|
||||
p.cancelWaitForMoreDeals()
|
||||
p.cancelWaitForMoreDeals = nil
|
||||
p.publishPeriodStart = time.Time{}
|
||||
}
|
||||
|
||||
// Filter out any deals that have been cancelled
|
||||
p.filterCancelledDeals()
|
||||
deals := p.pending[:]
|
||||
p.pending = nil
|
||||
|
||||
// Send the publish message
|
||||
go p.publishReady(deals)
|
||||
}
|
||||
|
||||
func (p *DealPublisher) publishReady(ready []*pendingDeal) {
|
||||
if len(ready) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// onComplete is called when the publish message has been sent or there
|
||||
// was an error
|
||||
onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) {
|
||||
// Send the publish result on the pending deal's Result channel
|
||||
res := publishResult{
|
||||
msgCid: msgCid,
|
||||
err: err,
|
||||
}
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
case <-pd.ctx.Done():
|
||||
case pd.Result <- res:
|
||||
}
|
||||
}
|
||||
|
||||
// Validate each deal to make sure it can be published
|
||||
validated := make([]*pendingDeal, 0, len(ready))
|
||||
deals := make([]market2.ClientDealProposal, 0, len(ready))
|
||||
for _, pd := range ready {
|
||||
// Validate the deal
|
||||
if err := p.validateDeal(pd.deal); err != nil {
|
||||
// Validation failed, complete immediately with an error
|
||||
go onComplete(pd, cid.Undef, err)
|
||||
continue
|
||||
}
|
||||
|
||||
validated = append(validated, pd)
|
||||
deals = append(deals, pd.deal)
|
||||
}
|
||||
|
||||
// Send the publish message
|
||||
msgCid, err := p.publishDealProposals(deals)
|
||||
|
||||
// Signal that each deal has been published
|
||||
for _, pd := range validated {
|
||||
go onComplete(pd, msgCid, err)
|
||||
}
|
||||
}
|
||||
|
||||
// validateDeal checks that the deal proposal start epoch hasn't already
|
||||
// elapsed
|
||||
func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error {
|
||||
head, err := p.api.ChainHead(p.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if head.Height() > deal.Proposal.StartEpoch {
|
||||
return xerrors.Errorf(
|
||||
"cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
|
||||
deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sends the publish message
|
||||
func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) {
|
||||
if len(deals) == 0 {
|
||||
return cid.Undef, nil
|
||||
}
|
||||
|
||||
log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals))
|
||||
|
||||
provider := deals[0].Proposal.Provider
|
||||
for _, dl := range deals {
|
||||
if dl.Proposal.Provider != provider {
|
||||
msg := fmt.Sprintf("publishing %d deals failed: ", len(deals)) +
|
||||
"not all deals are for same provider: " +
|
||||
fmt.Sprintf("deal with piece CID %s is for provider %s ", deals[0].Proposal.PieceCID, deals[0].Proposal.Provider) +
|
||||
fmt.Sprintf("but deal with piece CID %s is for provider %s", dl.Proposal.PieceCID, dl.Proposal.Provider)
|
||||
return cid.Undef, xerrors.Errorf(msg)
|
||||
}
|
||||
}
|
||||
|
||||
mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{
|
||||
Deals: deals,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err)
|
||||
}
|
||||
|
||||
smsg, err := p.api.MpoolPushMessage(p.ctx, &types.Message{
|
||||
To: market.Address,
|
||||
From: mi.Worker,
|
||||
Value: types.NewInt(0),
|
||||
Method: market.Methods.PublishStorageDeals,
|
||||
Params: params,
|
||||
}, p.publishSpec)
|
||||
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
return smsg.Cid(), nil
|
||||
}
|
||||
|
||||
func pieceCids(deals []market2.ClientDealProposal) string {
|
||||
cids := make([]string, 0, len(deals))
|
||||
for _, dl := range deals {
|
||||
cids = append(cids, dl.Proposal.PieceCID.String())
|
||||
}
|
||||
return strings.Join(cids, ", ")
|
||||
}
|
||||
|
||||
// filter out deals that have been cancelled
|
||||
func (p *DealPublisher) filterCancelledDeals() {
|
||||
i := 0
|
||||
for _, pd := range p.pending {
|
||||
if pd.ctx.Err() == nil {
|
||||
p.pending[i] = pd
|
||||
i++
|
||||
}
|
||||
}
|
||||
p.pending = p.pending[:i]
|
||||
}
|
266
markets/storageadapter/dealpublisher_test.go
Normal file
266
markets/storageadapter/dealpublisher_test.go
Normal file
@ -0,0 +1,266 @@
|
||||
package storageadapter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
market0 "github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
)
|
||||
|
||||
func TestDealPublisher(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
publishPeriod time.Duration
|
||||
maxDealsPerMsg uint64
|
||||
dealCountWithinPublishPeriod int
|
||||
ctxCancelledWithinPublishPeriod int
|
||||
expiredDeals int
|
||||
dealCountAfterPublishPeriod int
|
||||
expectedDealsPerMsg []int
|
||||
}{{
|
||||
name: "publish one deal within publish period",
|
||||
publishPeriod: 10 * time.Millisecond,
|
||||
maxDealsPerMsg: 5,
|
||||
dealCountWithinPublishPeriod: 1,
|
||||
dealCountAfterPublishPeriod: 0,
|
||||
expectedDealsPerMsg: []int{1},
|
||||
}, {
|
||||
name: "publish two deals within publish period",
|
||||
publishPeriod: 10 * time.Millisecond,
|
||||
maxDealsPerMsg: 5,
|
||||
dealCountWithinPublishPeriod: 2,
|
||||
dealCountAfterPublishPeriod: 0,
|
||||
expectedDealsPerMsg: []int{2},
|
||||
}, {
|
||||
name: "publish one deal within publish period, and one after",
|
||||
publishPeriod: 10 * time.Millisecond,
|
||||
maxDealsPerMsg: 5,
|
||||
dealCountWithinPublishPeriod: 1,
|
||||
dealCountAfterPublishPeriod: 1,
|
||||
expectedDealsPerMsg: []int{1, 1},
|
||||
}, {
|
||||
name: "publish deals that exceed max deals per message within publish period, and one after",
|
||||
publishPeriod: 10 * time.Millisecond,
|
||||
maxDealsPerMsg: 2,
|
||||
dealCountWithinPublishPeriod: 3,
|
||||
dealCountAfterPublishPeriod: 1,
|
||||
expectedDealsPerMsg: []int{2, 1, 1},
|
||||
}, {
|
||||
name: "ignore deals with cancelled context",
|
||||
publishPeriod: 10 * time.Millisecond,
|
||||
maxDealsPerMsg: 5,
|
||||
dealCountWithinPublishPeriod: 2,
|
||||
ctxCancelledWithinPublishPeriod: 2,
|
||||
dealCountAfterPublishPeriod: 1,
|
||||
expectedDealsPerMsg: []int{2, 1},
|
||||
}, {
|
||||
name: "ignore expired deals",
|
||||
publishPeriod: 10 * time.Millisecond,
|
||||
maxDealsPerMsg: 5,
|
||||
dealCountWithinPublishPeriod: 2,
|
||||
expiredDeals: 2,
|
||||
dealCountAfterPublishPeriod: 1,
|
||||
expectedDealsPerMsg: []int{2, 1},
|
||||
}, {
|
||||
name: "zero config",
|
||||
publishPeriod: 0,
|
||||
maxDealsPerMsg: 0,
|
||||
dealCountWithinPublishPeriod: 2,
|
||||
ctxCancelledWithinPublishPeriod: 0,
|
||||
dealCountAfterPublishPeriod: 2,
|
||||
expectedDealsPerMsg: []int{1, 1, 1, 1},
|
||||
}}
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
client := tutils.NewActorAddr(t, "client")
|
||||
provider := tutils.NewActorAddr(t, "provider")
|
||||
worker := tutils.NewActorAddr(t, "worker")
|
||||
dpapi := newDPAPI(t, worker)
|
||||
|
||||
// Create a deal publisher
|
||||
dp := newDealPublisher(dpapi, PublishMsgConfig{
|
||||
Period: tc.publishPeriod,
|
||||
MaxDealsPerMsg: tc.maxDealsPerMsg,
|
||||
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})
|
||||
|
||||
// Keep a record of the deals that were submitted to be published
|
||||
var dealsToPublish []market.ClientDealProposal
|
||||
publishDeal := func(ctxCancelled bool, expired bool) {
|
||||
pctx := ctx
|
||||
var cancel context.CancelFunc
|
||||
if ctxCancelled {
|
||||
pctx, cancel = context.WithCancel(ctx)
|
||||
cancel()
|
||||
}
|
||||
|
||||
startEpoch := abi.ChainEpoch(20)
|
||||
if expired {
|
||||
startEpoch = abi.ChainEpoch(5)
|
||||
}
|
||||
deal := market.ClientDealProposal{
|
||||
Proposal: market0.DealProposal{
|
||||
PieceCID: generateCids(1)[0],
|
||||
Client: client,
|
||||
Provider: provider,
|
||||
StartEpoch: startEpoch,
|
||||
EndEpoch: abi.ChainEpoch(120),
|
||||
},
|
||||
ClientSignature: crypto.Signature{
|
||||
Type: crypto.SigTypeSecp256k1,
|
||||
Data: []byte("signature data"),
|
||||
},
|
||||
}
|
||||
if !ctxCancelled && !expired {
|
||||
dealsToPublish = append(dealsToPublish, deal)
|
||||
}
|
||||
go func() {
|
||||
_, err := dp.Publish(pctx, deal)
|
||||
if ctxCancelled || expired {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Publish deals within publish period
|
||||
for i := 0; i < tc.dealCountWithinPublishPeriod; i++ {
|
||||
publishDeal(false, false)
|
||||
}
|
||||
for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ {
|
||||
publishDeal(true, false)
|
||||
}
|
||||
for i := 0; i < tc.expiredDeals; i++ {
|
||||
publishDeal(false, true)
|
||||
}
|
||||
|
||||
// Wait until publish period has elapsed
|
||||
time.Sleep(2 * tc.publishPeriod)
|
||||
|
||||
// Publish deals after publish period
|
||||
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
|
||||
publishDeal(false, false)
|
||||
}
|
||||
|
||||
// For each message that was expected to be sent
|
||||
var publishedDeals []market.ClientDealProposal
|
||||
for _, expectedDealsInMsg := range tc.expectedDealsPerMsg {
|
||||
// Should have called StateMinerInfo with the provider address
|
||||
stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls
|
||||
require.Equal(t, provider, stateMinerInfoAddr)
|
||||
|
||||
// Check the fields of the message that was sent
|
||||
msg := <-dpapi.pushedMsgs
|
||||
require.Equal(t, worker, msg.From)
|
||||
require.Equal(t, market.Address, msg.To)
|
||||
require.Equal(t, market.Methods.PublishStorageDeals, msg.Method)
|
||||
|
||||
// Check that the expected number of deals was included in the message
|
||||
var params market2.PublishStorageDealsParams
|
||||
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, params.Deals, expectedDealsInMsg)
|
||||
|
||||
// Keep track of the deals that were sent
|
||||
for _, d := range params.Deals {
|
||||
publishedDeals = append(publishedDeals, d)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that all deals that were submitted to be published were
|
||||
// sent out (we do this by ensuring all the piece CIDs are present)
|
||||
require.True(t, matchPieceCids(publishedDeals, dealsToPublish))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool {
|
||||
cidsA := dealPieceCids(sent)
|
||||
cidsB := dealPieceCids(exp)
|
||||
|
||||
if len(cidsA) != len(cidsB) {
|
||||
return false
|
||||
}
|
||||
|
||||
s1 := cid.NewSet()
|
||||
for _, c := range cidsA {
|
||||
s1.Add(c)
|
||||
}
|
||||
|
||||
for _, c := range cidsB {
|
||||
if !s1.Has(c) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func dealPieceCids(deals []market2.ClientDealProposal) []cid.Cid {
|
||||
cids := make([]cid.Cid, 0, len(deals))
|
||||
for _, dl := range deals {
|
||||
cids = append(cids, dl.Proposal.PieceCID)
|
||||
}
|
||||
return cids
|
||||
}
|
||||
|
||||
type dpAPI struct {
|
||||
t *testing.T
|
||||
worker address.Address
|
||||
|
||||
stateMinerInfoCalls chan address.Address
|
||||
pushedMsgs chan *types.Message
|
||||
}
|
||||
|
||||
func newDPAPI(t *testing.T, worker address.Address) *dpAPI {
|
||||
return &dpAPI{
|
||||
t: t,
|
||||
worker: worker,
|
||||
stateMinerInfoCalls: make(chan address.Address, 128),
|
||||
pushedMsgs: make(chan *types.Message, 128),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dpAPI) ChainHead(ctx context.Context) (*types.TipSet, error) {
|
||||
dummyCid, err := cid.Parse("bafkqaaa")
|
||||
require.NoError(d.t, err)
|
||||
return types.NewTipSet([]*types.BlockHeader{{
|
||||
Miner: tutils.NewActorAddr(d.t, "miner"),
|
||||
Height: abi.ChainEpoch(10),
|
||||
ParentStateRoot: dummyCid,
|
||||
Messages: dummyCid,
|
||||
ParentMessageReceipts: dummyCid,
|
||||
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
|
||||
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
|
||||
}})
|
||||
}
|
||||
|
||||
func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) {
|
||||
d.stateMinerInfoCalls <- address
|
||||
return miner.MinerInfo{Worker: d.worker}, nil
|
||||
}
|
||||
|
||||
func (d *dpAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
||||
d.pushedMsgs <- msg
|
||||
return &types.SignedMessage{Message: *msg}, nil
|
||||
}
|
@ -1,102 +0,0 @@
|
||||
package storageadapter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/exitcode"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type getCurrentDealInfoAPI interface {
|
||||
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
|
||||
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
|
||||
|
||||
diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error)
|
||||
}
|
||||
|
||||
// GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed
|
||||
func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, types.TipSetKey, error) {
|
||||
marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key())
|
||||
if dealErr == nil {
|
||||
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
|
||||
if err != nil {
|
||||
return dealID, nil, types.EmptyTSK, err
|
||||
}
|
||||
if equal {
|
||||
return dealID, marketDeal, types.EmptyTSK, nil
|
||||
}
|
||||
dealErr = xerrors.Errorf("Deal proposals did not match")
|
||||
}
|
||||
if publishCid == nil {
|
||||
return dealID, nil, types.EmptyTSK, dealErr
|
||||
}
|
||||
// attempt deal id correction
|
||||
lookup, err := api.StateSearchMsg(ctx, *publishCid)
|
||||
if err != nil {
|
||||
return dealID, nil, types.EmptyTSK, err
|
||||
}
|
||||
|
||||
if lookup.Receipt.ExitCode != exitcode.Ok {
|
||||
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode)
|
||||
}
|
||||
|
||||
var retval market.PublishStorageDealsReturn
|
||||
if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
|
||||
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
|
||||
}
|
||||
|
||||
if len(retval.IDs) != 1 {
|
||||
// market currently only ever sends messages with 1 deal
|
||||
return dealID, nil, types.EmptyTSK, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
|
||||
}
|
||||
|
||||
if retval.IDs[0] == dealID {
|
||||
// DealID did not change, so we are stuck with the original lookup error
|
||||
return dealID, nil, lookup.TipSet, dealErr
|
||||
}
|
||||
|
||||
dealID = retval.IDs[0]
|
||||
marketDeal, err = api.StateMarketStorageDeal(ctx, dealID, ts.Key())
|
||||
|
||||
if err == nil {
|
||||
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
|
||||
if err != nil {
|
||||
return dealID, nil, types.EmptyTSK, err
|
||||
}
|
||||
if !equal {
|
||||
return dealID, nil, types.EmptyTSK, xerrors.Errorf("Deal proposals did not match")
|
||||
}
|
||||
}
|
||||
return dealID, marketDeal, lookup.TipSet, err
|
||||
}
|
||||
|
||||
func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) {
|
||||
p1ClientID, err := api.StateLookupID(ctx, p1.Client, ts.Key())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
p2ClientID, err := api.StateLookupID(ctx, p2.Client, ts.Key())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return p1.PieceCID.Equals(p2.PieceCID) &&
|
||||
p1.PieceSize == p2.PieceSize &&
|
||||
p1.VerifiedDeal == p2.VerifiedDeal &&
|
||||
p1.Label == p2.Label &&
|
||||
p1.StartEpoch == p2.StartEpoch &&
|
||||
p1.EndEpoch == p2.EndEpoch &&
|
||||
p1.StoragePricePerEpoch.Equals(p2.StoragePricePerEpoch) &&
|
||||
p1.ProviderCollateral.Equals(p2.ProviderCollateral) &&
|
||||
p1.ClientCollateral.Equals(p2.ClientCollateral) &&
|
||||
p1.Provider == p2.Provider &&
|
||||
p1ClientID == p2ClientID, nil
|
||||
}
|
@ -1,268 +0,0 @@
|
||||
package storageadapter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/exitcode"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
test "github.com/filecoin-project/lotus/chain/events/state/mock"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var errNotFound = errors.New("Could not find")
|
||||
|
||||
func TestGetCurrentDealInfo(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dummyCid, _ := cid.Parse("bafkqaaa")
|
||||
startDealID := abi.DealID(rand.Uint64())
|
||||
newDealID := abi.DealID(rand.Uint64())
|
||||
twoValuesReturn := makePublishDealsReturnBytes(t, []abi.DealID{abi.DealID(rand.Uint64()), abi.DealID(rand.Uint64())})
|
||||
sameValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{startDealID})
|
||||
newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID})
|
||||
proposal := market.DealProposal{
|
||||
PieceCID: dummyCid,
|
||||
PieceSize: abi.PaddedPieceSize(rand.Uint64()),
|
||||
Label: "success",
|
||||
}
|
||||
otherProposal := market.DealProposal{
|
||||
PieceCID: dummyCid,
|
||||
PieceSize: abi.PaddedPieceSize(rand.Uint64()),
|
||||
Label: "other",
|
||||
}
|
||||
successDeal := &api.MarketDeal{
|
||||
Proposal: proposal,
|
||||
State: market.DealState{
|
||||
SectorStartEpoch: 1,
|
||||
LastUpdatedEpoch: 2,
|
||||
},
|
||||
}
|
||||
otherDeal := &api.MarketDeal{
|
||||
Proposal: otherProposal,
|
||||
State: market.DealState{
|
||||
SectorStartEpoch: 1,
|
||||
LastUpdatedEpoch: 2,
|
||||
},
|
||||
}
|
||||
testCases := map[string]struct {
|
||||
searchMessageLookup *api.MsgLookup
|
||||
searchMessageErr error
|
||||
marketDeals map[abi.DealID]*api.MarketDeal
|
||||
publishCid *cid.Cid
|
||||
expectedDealID abi.DealID
|
||||
expectedMarketDeal *api.MarketDeal
|
||||
expectedError error
|
||||
}{
|
||||
"deal lookup succeeds": {
|
||||
marketDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: successDeal,
|
||||
},
|
||||
expectedDealID: startDealID,
|
||||
expectedMarketDeal: successDeal,
|
||||
},
|
||||
"publish CID = nil": {
|
||||
expectedDealID: startDealID,
|
||||
expectedError: errNotFound,
|
||||
},
|
||||
"publish CID = nil, other deal on lookup": {
|
||||
marketDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: otherDeal,
|
||||
},
|
||||
expectedDealID: startDealID,
|
||||
expectedError: xerrors.Errorf("Deal proposals did not match"),
|
||||
},
|
||||
"search message fails": {
|
||||
publishCid: &dummyCid,
|
||||
searchMessageErr: errors.New("something went wrong"),
|
||||
expectedDealID: startDealID,
|
||||
expectedError: errors.New("something went wrong"),
|
||||
},
|
||||
"return code not ok": {
|
||||
publishCid: &dummyCid,
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.ErrIllegalState,
|
||||
},
|
||||
},
|
||||
expectedDealID: startDealID,
|
||||
expectedError: xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", dummyCid, exitcode.ErrIllegalState),
|
||||
},
|
||||
"unable to unmarshal params": {
|
||||
publishCid: &dummyCid,
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.Ok,
|
||||
Return: []byte("applesauce"),
|
||||
},
|
||||
},
|
||||
expectedDealID: startDealID,
|
||||
expectedError: xerrors.Errorf("looking for publish deal message: unmarshaling message return: cbor input should be of type array"),
|
||||
},
|
||||
"more than one returned id": {
|
||||
publishCid: &dummyCid,
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.Ok,
|
||||
Return: twoValuesReturn,
|
||||
},
|
||||
},
|
||||
expectedDealID: startDealID,
|
||||
expectedError: xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal"),
|
||||
},
|
||||
"deal ids still match": {
|
||||
publishCid: &dummyCid,
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.Ok,
|
||||
Return: sameValueReturn,
|
||||
},
|
||||
},
|
||||
expectedDealID: startDealID,
|
||||
expectedError: errNotFound,
|
||||
},
|
||||
"new deal id success": {
|
||||
publishCid: &dummyCid,
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.Ok,
|
||||
Return: newValueReturn,
|
||||
},
|
||||
},
|
||||
marketDeals: map[abi.DealID]*api.MarketDeal{
|
||||
newDealID: successDeal,
|
||||
},
|
||||
expectedDealID: newDealID,
|
||||
expectedMarketDeal: successDeal,
|
||||
},
|
||||
"new deal id after other deal found": {
|
||||
publishCid: &dummyCid,
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.Ok,
|
||||
Return: newValueReturn,
|
||||
},
|
||||
},
|
||||
marketDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: otherDeal,
|
||||
newDealID: successDeal,
|
||||
},
|
||||
expectedDealID: newDealID,
|
||||
expectedMarketDeal: successDeal,
|
||||
},
|
||||
"new deal id failure": {
|
||||
publishCid: &dummyCid,
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.Ok,
|
||||
Return: newValueReturn,
|
||||
},
|
||||
},
|
||||
expectedDealID: newDealID,
|
||||
expectedError: errNotFound,
|
||||
},
|
||||
"new deal id, failure due to other deal present": {
|
||||
publishCid: &dummyCid,
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.Ok,
|
||||
Return: newValueReturn,
|
||||
},
|
||||
},
|
||||
marketDeals: map[abi.DealID]*api.MarketDeal{
|
||||
newDealID: otherDeal,
|
||||
},
|
||||
expectedDealID: newDealID,
|
||||
expectedError: xerrors.Errorf("Deal proposals did not match"),
|
||||
},
|
||||
}
|
||||
runTestCase := func(testCase string, data struct {
|
||||
searchMessageLookup *api.MsgLookup
|
||||
searchMessageErr error
|
||||
marketDeals map[abi.DealID]*api.MarketDeal
|
||||
publishCid *cid.Cid
|
||||
expectedDealID abi.DealID
|
||||
expectedMarketDeal *api.MarketDeal
|
||||
expectedError error
|
||||
}) {
|
||||
t.Run(testCase, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
ts, err := test.MockTipset(address.TestAddress, rand.Uint64())
|
||||
require.NoError(t, err)
|
||||
marketDeals := make(map[marketDealKey]*api.MarketDeal)
|
||||
for dealID, deal := range data.marketDeals {
|
||||
marketDeals[marketDealKey{dealID, ts.Key()}] = deal
|
||||
}
|
||||
api := &mockGetCurrentDealInfoAPI{
|
||||
SearchMessageLookup: data.searchMessageLookup,
|
||||
SearchMessageErr: data.searchMessageErr,
|
||||
MarketDeals: marketDeals,
|
||||
}
|
||||
|
||||
dealID, marketDeal, _, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid)
|
||||
require.Equal(t, data.expectedDealID, dealID)
|
||||
require.Equal(t, data.expectedMarketDeal, marketDeal)
|
||||
if data.expectedError == nil {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.EqualError(t, err, data.expectedError.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
for testCase, data := range testCases {
|
||||
runTestCase(testCase, data)
|
||||
}
|
||||
}
|
||||
|
||||
type marketDealKey struct {
|
||||
abi.DealID
|
||||
types.TipSetKey
|
||||
}
|
||||
|
||||
type mockGetCurrentDealInfoAPI struct {
|
||||
SearchMessageLookup *api.MsgLookup
|
||||
SearchMessageErr error
|
||||
|
||||
MarketDeals map[marketDealKey]*api.MarketDeal
|
||||
}
|
||||
|
||||
func (mapi *mockGetCurrentDealInfoAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) {
|
||||
return &miner.PreCommitChanges{}, nil
|
||||
}
|
||||
|
||||
func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) {
|
||||
deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}]
|
||||
if !ok {
|
||||
return nil, errNotFound
|
||||
}
|
||||
return deal, nil
|
||||
}
|
||||
|
||||
func (mapi *mockGetCurrentDealInfoAPI) StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) {
|
||||
return mapi.SearchMessageLookup, mapi.SearchMessageErr
|
||||
}
|
||||
|
||||
func (mapi *mockGetCurrentDealInfoAPI) StateLookupID(ctx context.Context, addr address.Address, ts types.TipSetKey) (address.Address, error) {
|
||||
return addr, nil
|
||||
}
|
||||
|
||||
func makePublishDealsReturnBytes(t *testing.T, dealIDs []abi.DealID) []byte {
|
||||
buf := new(bytes.Buffer)
|
||||
dealsReturn := market.PublishStorageDealsReturn{
|
||||
IDs: dealIDs,
|
||||
}
|
||||
err := dealsReturn.MarshalCBOR(buf)
|
||||
require.NoError(t, err)
|
||||
return buf.Bytes()
|
||||
}
|
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -19,11 +20,40 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
type sectorCommittedEventsAPI interface {
|
||||
type eventsCalledAPI interface {
|
||||
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
|
||||
}
|
||||
|
||||
func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error {
|
||||
type dealInfoAPI interface {
|
||||
GetCurrentDealInfo(ctx context.Context, tok sealing.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, error)
|
||||
}
|
||||
|
||||
type diffPreCommitsAPI interface {
|
||||
diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error)
|
||||
}
|
||||
|
||||
type SectorCommittedManager struct {
|
||||
ev eventsCalledAPI
|
||||
dealInfo dealInfoAPI
|
||||
dpc diffPreCommitsAPI
|
||||
}
|
||||
|
||||
func NewSectorCommittedManager(ev eventsCalledAPI, tskAPI sealing.CurrentDealInfoTskAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager {
|
||||
dim := &sealing.CurrentDealInfoManager{
|
||||
CDAPI: &sealing.CurrentDealInfoAPIAdapter{CurrentDealInfoTskAPI: tskAPI},
|
||||
}
|
||||
return newSectorCommittedManager(ev, dim, dpcAPI)
|
||||
}
|
||||
|
||||
func newSectorCommittedManager(ev eventsCalledAPI, dealInfo dealInfoAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager {
|
||||
return &SectorCommittedManager{
|
||||
ev: ev,
|
||||
dealInfo: dealInfo,
|
||||
dpc: dpcAPI,
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, proposal market.DealProposal, publishCid cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error {
|
||||
// Ensure callback is only called once
|
||||
var once sync.Once
|
||||
cb := func(sectorNumber abi.SectorNumber, isActive bool, err error) {
|
||||
@ -34,7 +64,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
|
||||
|
||||
// First check if the deal is already active, and if so, bail out
|
||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||
di, isActive, publishTs, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
|
||||
dealInfo, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid)
|
||||
if err != nil {
|
||||
// Note: the error returned from here will end up being returned
|
||||
// from OnDealSectorPreCommitted so no need to call the callback
|
||||
@ -54,24 +84,19 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
|
||||
// when the client node was down after the deal was published, and when
|
||||
// the precommit containing it landed on chain)
|
||||
|
||||
if publishTs == types.EmptyTSK {
|
||||
lookup, err := api.StateSearchMsg(ctx, *publishCid)
|
||||
publishTs, err := types.TipSetKeyFromBytes(dealInfo.PublishMsgTipSet)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
if lookup != nil { // can be nil in tests
|
||||
publishTs = lookup.TipSet
|
||||
}
|
||||
}
|
||||
|
||||
diff, err := api.diffPreCommits(ctx, provider, publishTs, ts.Key())
|
||||
diff, err := mgr.dpc.diffPreCommits(ctx, provider, publishTs, ts.Key())
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
|
||||
for _, info := range diff.Added {
|
||||
for _, d := range info.Info.DealIDs {
|
||||
if d == di {
|
||||
if d == dealInfo.DealID {
|
||||
cb(info.Info.SectorNumber, false, nil)
|
||||
return true, false, nil
|
||||
}
|
||||
@ -103,7 +128,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
|
||||
// If the deal hasn't been activated by the proposed start epoch, the
|
||||
// deal will timeout (when msg == nil it means the timeout epoch was reached)
|
||||
if msg == nil {
|
||||
err = xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
|
||||
err = xerrors.Errorf("deal with piece CID %s was not activated by proposed deal start epoch %d", proposal.PieceCID, proposal.StartEpoch)
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -118,16 +143,16 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
|
||||
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
|
||||
}
|
||||
|
||||
// When the deal is published, the deal ID may change, so get the
|
||||
// When there is a reorg, the deal ID may change, so get the
|
||||
// current deal ID from the publish message CID
|
||||
dealID, _, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
|
||||
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Check through the deal IDs associated with this message
|
||||
for _, did := range params.DealIDs {
|
||||
if did == dealID {
|
||||
if did == res.DealID {
|
||||
// Found the deal ID in this message. Callback with the sector ID.
|
||||
cb(params.SectorNumber, false, nil)
|
||||
return false, nil
|
||||
@ -144,14 +169,14 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
|
||||
if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
|
||||
return xerrors.Errorf("failed to set up called handler: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error {
|
||||
func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, provider address.Address, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid cid.Cid, callback storagemarket.DealSectorCommittedCallback) error {
|
||||
// Ensure callback is only called once
|
||||
var once sync.Once
|
||||
cb := func(err error) {
|
||||
@ -162,7 +187,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
|
||||
|
||||
// First check if the deal is already active, and if so, bail out
|
||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||
_, isActive, _, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
|
||||
_, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid)
|
||||
if err != nil {
|
||||
// Note: the error returned from here will end up being returned
|
||||
// from OnDealSectorCommitted so no need to call the callback
|
||||
@ -208,7 +233,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
|
||||
// If the deal hasn't been activated by the proposed start epoch, the
|
||||
// deal will timeout (when msg == nil it means the timeout epoch was reached)
|
||||
if msg == nil {
|
||||
err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
|
||||
err := xerrors.Errorf("deal with piece CID %s was not activated by proposed deal start epoch %d", proposal.PieceCID, proposal.StartEpoch)
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -218,17 +243,17 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
|
||||
}
|
||||
|
||||
// Get the deal info
|
||||
_, sd, _, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
|
||||
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
||||
}
|
||||
|
||||
// Make sure the deal is active
|
||||
if sd.State.SectorStartEpoch < 1 {
|
||||
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealID, ts.ParentState(), ts.Height())
|
||||
if res.MarketDeal.State.SectorStartEpoch < 1 {
|
||||
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", res.DealID, ts.ParentState(), ts.Height())
|
||||
}
|
||||
|
||||
log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch)
|
||||
log.Infof("Storage deal %d activated at epoch %d", res.DealID, res.MarketDeal.State.SectorStartEpoch)
|
||||
|
||||
cb(nil)
|
||||
|
||||
@ -241,29 +266,29 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
|
||||
if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
|
||||
return xerrors.Errorf("failed to set up called handler: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, bool, types.TipSetKey, error) {
|
||||
di, sd, publishTs, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
|
||||
func (mgr *SectorCommittedManager) checkIfDealAlreadyActive(ctx context.Context, ts *types.TipSet, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, bool, error) {
|
||||
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), proposal, publishCid)
|
||||
if err != nil {
|
||||
// TODO: This may be fine for some errors
|
||||
return 0, false, types.EmptyTSK, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
||||
}
|
||||
|
||||
// Sector with deal is already active
|
||||
if sd.State.SectorStartEpoch > 0 {
|
||||
return 0, true, publishTs, nil
|
||||
return res, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
||||
}
|
||||
|
||||
// Sector was slashed
|
||||
if sd.State.SlashEpoch > 0 {
|
||||
return 0, false, types.EmptyTSK, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
|
||||
if res.MarketDeal.State.SlashEpoch > 0 {
|
||||
return res, false, xerrors.Errorf("deal %d was slashed at epoch %d", res.DealID, res.MarketDeal.State.SlashEpoch)
|
||||
}
|
||||
|
||||
return di, false, publishTs, nil
|
||||
// Sector with deal is already active
|
||||
if res.MarketDeal.State.SectorStartEpoch > 0 {
|
||||
return res, true, nil
|
||||
}
|
||||
|
||||
return res, false, nil
|
||||
}
|
||||
|
@ -7,6 +7,9 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -15,13 +18,13 @@ import (
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/cbor"
|
||||
"github.com/filecoin-project/go-state-types/exitcode"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/events"
|
||||
test "github.com/filecoin-project/lotus/chain/events/state/mock"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -32,13 +35,16 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
||||
publishCid := generateCids(1)[0]
|
||||
sealedCid := generateCids(1)[0]
|
||||
pieceCid := generateCids(1)[0]
|
||||
startDealID := abi.DealID(rand.Uint64())
|
||||
newDealID := abi.DealID(rand.Uint64())
|
||||
newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID})
|
||||
dealID := abi.DealID(rand.Uint64())
|
||||
sectorNumber := abi.SectorNumber(rand.Uint64())
|
||||
proposal := market.DealProposal{
|
||||
PieceCID: pieceCid,
|
||||
PieceSize: abi.PaddedPieceSize(rand.Uint64()),
|
||||
Client: tutils.NewActorAddr(t, "client"),
|
||||
Provider: tutils.NewActorAddr(t, "provider"),
|
||||
StoragePricePerEpoch: abi.NewTokenAmount(1),
|
||||
ProviderCollateral: abi.NewTokenAmount(1),
|
||||
ClientCollateral: abi.NewTokenAmount(1),
|
||||
Label: "success",
|
||||
}
|
||||
unfinishedDeal := &api.MarketDeal{
|
||||
@ -48,17 +54,26 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
||||
LastUpdatedEpoch: 2,
|
||||
},
|
||||
}
|
||||
successDeal := &api.MarketDeal{
|
||||
activeDeal := &api.MarketDeal{
|
||||
Proposal: proposal,
|
||||
State: market.DealState{
|
||||
SectorStartEpoch: 1,
|
||||
LastUpdatedEpoch: 2,
|
||||
},
|
||||
}
|
||||
slashedDeal := &api.MarketDeal{
|
||||
Proposal: proposal,
|
||||
State: market.DealState{
|
||||
SectorStartEpoch: 1,
|
||||
LastUpdatedEpoch: 2,
|
||||
SlashEpoch: 2,
|
||||
},
|
||||
}
|
||||
type testCase struct {
|
||||
searchMessageLookup *api.MsgLookup
|
||||
searchMessageErr error
|
||||
checkTsDeals map[abi.DealID]*api.MarketDeal
|
||||
currentDealInfo sealing.CurrentDealInfo
|
||||
currentDealInfoErr error
|
||||
currentDealInfoErr2 error
|
||||
preCommitDiff *miner.PreCommitChanges
|
||||
matchStates []matchState
|
||||
dealStartEpochTimeout bool
|
||||
expectedCBCallCount uint64
|
||||
@ -69,45 +84,17 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
||||
}
|
||||
testCases := map[string]testCase{
|
||||
"normal sequence": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
matchStates: []matchState{
|
||||
{
|
||||
msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{
|
||||
SectorNumber: sectorNumber,
|
||||
SealedCID: sealedCid,
|
||||
DealIDs: []abi.DealID{startDealID},
|
||||
DealIDs: []abi.DealID{dealID},
|
||||
}),
|
||||
deals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedCBCallCount: 1,
|
||||
expectedCBIsActive: false,
|
||||
expectedCBSectorNumber: sectorNumber,
|
||||
},
|
||||
"deal id changes in called": {
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.Ok,
|
||||
Return: newValueReturn,
|
||||
},
|
||||
},
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
newDealID: unfinishedDeal,
|
||||
},
|
||||
matchStates: []matchState{
|
||||
{
|
||||
msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{
|
||||
SectorNumber: sectorNumber,
|
||||
SealedCID: sealedCid,
|
||||
DealIDs: []abi.DealID{newDealID},
|
||||
}),
|
||||
deals: map[abi.DealID]*api.MarketDeal{
|
||||
newDealID: unfinishedDeal,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedCBCallCount: 1,
|
||||
@ -115,85 +102,98 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
||||
expectedCBSectorNumber: sectorNumber,
|
||||
},
|
||||
"ignores unsuccessful pre-commit message": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
matchStates: []matchState{
|
||||
{
|
||||
msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{
|
||||
SectorNumber: sectorNumber,
|
||||
SealedCID: sealedCid,
|
||||
DealIDs: []abi.DealID{startDealID},
|
||||
DealIDs: []abi.DealID{dealID},
|
||||
}),
|
||||
deals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
},
|
||||
// non-zero exit code indicates unsuccessful pre-commit message
|
||||
receipt: &types.MessageReceipt{ExitCode: 1},
|
||||
},
|
||||
},
|
||||
expectedCBCallCount: 0,
|
||||
},
|
||||
"error on deal in check": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{},
|
||||
searchMessageErr: errors.New("something went wrong"),
|
||||
expectedCBCallCount: 0,
|
||||
expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"),
|
||||
"deal already pre-committed": {
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
"sector start epoch > 0 in check": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: successDeal,
|
||||
preCommitDiff: &miner.PreCommitChanges{
|
||||
Added: []miner.SectorPreCommitOnChainInfo{{
|
||||
Info: miner.SectorPreCommitInfo{
|
||||
SectorNumber: sectorNumber,
|
||||
DealIDs: []abi.DealID{dealID},
|
||||
},
|
||||
}},
|
||||
},
|
||||
expectedCBCallCount: 1,
|
||||
expectedCBIsActive: false,
|
||||
expectedCBSectorNumber: sectorNumber,
|
||||
},
|
||||
"error getting current deal info in check func": {
|
||||
currentDealInfoErr: errors.New("something went wrong"),
|
||||
expectedCBCallCount: 0,
|
||||
expectedError: xerrors.Errorf("failed to set up called handler: failed to look up deal on chain: something went wrong"),
|
||||
},
|
||||
"sector already active": {
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: activeDeal,
|
||||
},
|
||||
expectedCBCallCount: 1,
|
||||
expectedCBIsActive: true,
|
||||
},
|
||||
"error on deal in pre-commit": {
|
||||
searchMessageErr: errors.New("something went wrong"),
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
"sector was slashed": {
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: slashedDeal,
|
||||
PublishMsgTipSet: nil,
|
||||
},
|
||||
expectedCBCallCount: 0,
|
||||
expectedError: xerrors.Errorf("failed to set up called handler: deal %d was slashed at epoch %d", dealID, slashedDeal.State.SlashEpoch),
|
||||
},
|
||||
"error getting current deal info in called func": {
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
currentDealInfoErr2: errors.New("something went wrong"),
|
||||
matchStates: []matchState{
|
||||
{
|
||||
msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{
|
||||
SectorNumber: sectorNumber,
|
||||
SealedCID: sealedCid,
|
||||
DealIDs: []abi.DealID{startDealID},
|
||||
DealIDs: []abi.DealID{dealID},
|
||||
}),
|
||||
deals: map[abi.DealID]*api.MarketDeal{},
|
||||
},
|
||||
},
|
||||
expectedCBCallCount: 0,
|
||||
expectedError: errors.New("failed to set up called handler: something went wrong"),
|
||||
expectedCBCallCount: 1,
|
||||
expectedCBError: errors.New("handling applied event: something went wrong"),
|
||||
},
|
||||
"proposed deal epoch timeout": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: activeDeal,
|
||||
},
|
||||
dealStartEpochTimeout: true,
|
||||
expectedCBCallCount: 1,
|
||||
expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID),
|
||||
expectedCBError: xerrors.Errorf("handling applied event: deal with piece CID %s was not activated by proposed deal start epoch 0", unfinishedDeal.Proposal.PieceCID),
|
||||
},
|
||||
}
|
||||
runTestCase := func(testCase string, data testCase) {
|
||||
t.Run(testCase, func(t *testing.T) {
|
||||
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
// defer cancel()
|
||||
api := &mockGetCurrentDealInfoAPI{
|
||||
SearchMessageLookup: data.searchMessageLookup,
|
||||
SearchMessageErr: data.searchMessageErr,
|
||||
MarketDeals: make(map[marketDealKey]*api.MarketDeal),
|
||||
}
|
||||
checkTs, err := test.MockTipset(provider, rand.Uint64())
|
||||
require.NoError(t, err)
|
||||
for dealID, deal := range data.checkTsDeals {
|
||||
api.MarketDeals[marketDealKey{dealID, checkTs.Key()}] = deal
|
||||
}
|
||||
matchMessages := make([]matchMessage, len(data.matchStates))
|
||||
for i, ms := range data.matchStates {
|
||||
matchTs, err := test.MockTipset(provider, rand.Uint64())
|
||||
require.NoError(t, err)
|
||||
for dealID, deal := range ms.deals {
|
||||
api.MarketDeals[marketDealKey{dealID, matchTs.Key()}] = deal
|
||||
}
|
||||
matchMessages[i] = matchMessage{
|
||||
curH: 5,
|
||||
msg: ms.msg,
|
||||
@ -217,7 +217,18 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
||||
cbIsActive = isActive
|
||||
cbError = err
|
||||
}
|
||||
err = OnDealSectorPreCommitted(ctx, api, eventsAPI, provider, startDealID, proposal, &publishCid, cb)
|
||||
|
||||
mockPCAPI := &mockPreCommitsAPI{
|
||||
PCChanges: data.preCommitDiff,
|
||||
}
|
||||
mockDIAPI := &mockDealInfoAPI{
|
||||
CurrentDealInfo: data.currentDealInfo,
|
||||
CurrentDealInfo2: data.currentDealInfo,
|
||||
Err: data.currentDealInfoErr,
|
||||
Err2: data.currentDealInfoErr2,
|
||||
}
|
||||
scm := newSectorCommittedManager(eventsAPI, mockDIAPI, mockPCAPI)
|
||||
err = scm.OnDealSectorPreCommitted(ctx, provider, proposal, publishCid, cb)
|
||||
if data.expectedError == nil {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
@ -240,16 +251,18 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
||||
|
||||
func TestOnDealSectorCommitted(t *testing.T) {
|
||||
provider := address.TestAddress
|
||||
ctx := context.Background()
|
||||
publishCid := generateCids(1)[0]
|
||||
pieceCid := generateCids(1)[0]
|
||||
startDealID := abi.DealID(rand.Uint64())
|
||||
newDealID := abi.DealID(rand.Uint64())
|
||||
newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID})
|
||||
dealID := abi.DealID(rand.Uint64())
|
||||
sectorNumber := abi.SectorNumber(rand.Uint64())
|
||||
proposal := market.DealProposal{
|
||||
PieceCID: pieceCid,
|
||||
PieceSize: abi.PaddedPieceSize(rand.Uint64()),
|
||||
Client: tutils.NewActorAddr(t, "client"),
|
||||
Provider: tutils.NewActorAddr(t, "provider"),
|
||||
StoragePricePerEpoch: abi.NewTokenAmount(1),
|
||||
ProviderCollateral: abi.NewTokenAmount(1),
|
||||
ClientCollateral: abi.NewTokenAmount(1),
|
||||
Label: "success",
|
||||
}
|
||||
unfinishedDeal := &api.MarketDeal{
|
||||
@ -259,17 +272,26 @@ func TestOnDealSectorCommitted(t *testing.T) {
|
||||
LastUpdatedEpoch: 2,
|
||||
},
|
||||
}
|
||||
successDeal := &api.MarketDeal{
|
||||
activeDeal := &api.MarketDeal{
|
||||
Proposal: proposal,
|
||||
State: market.DealState{
|
||||
SectorStartEpoch: 1,
|
||||
LastUpdatedEpoch: 2,
|
||||
},
|
||||
}
|
||||
slashedDeal := &api.MarketDeal{
|
||||
Proposal: proposal,
|
||||
State: market.DealState{
|
||||
SectorStartEpoch: 1,
|
||||
LastUpdatedEpoch: 2,
|
||||
SlashEpoch: 2,
|
||||
},
|
||||
}
|
||||
type testCase struct {
|
||||
searchMessageLookup *api.MsgLookup
|
||||
searchMessageErr error
|
||||
checkTsDeals map[abi.DealID]*api.MarketDeal
|
||||
currentDealInfo sealing.CurrentDealInfo
|
||||
currentDealInfoErr error
|
||||
currentDealInfo2 sealing.CurrentDealInfo
|
||||
currentDealInfoErr2 error
|
||||
matchStates []matchState
|
||||
dealStartEpochTimeout bool
|
||||
expectedCBCallCount uint64
|
||||
@ -278,121 +300,118 @@ func TestOnDealSectorCommitted(t *testing.T) {
|
||||
}
|
||||
testCases := map[string]testCase{
|
||||
"normal sequence": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
currentDealInfo2: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: activeDeal,
|
||||
},
|
||||
matchStates: []matchState{
|
||||
{
|
||||
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
|
||||
SectorNumber: sectorNumber,
|
||||
}),
|
||||
deals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: successDeal,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedCBCallCount: 1,
|
||||
},
|
||||
"deal id changes in called": {
|
||||
searchMessageLookup: &api.MsgLookup{
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: exitcode.Ok,
|
||||
Return: newValueReturn,
|
||||
},
|
||||
},
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
newDealID: unfinishedDeal,
|
||||
},
|
||||
matchStates: []matchState{
|
||||
{
|
||||
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
|
||||
SectorNumber: sectorNumber,
|
||||
}),
|
||||
deals: map[abi.DealID]*api.MarketDeal{
|
||||
newDealID: successDeal,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedCBCallCount: 1,
|
||||
},
|
||||
"ignores unsuccessful prove-commit message": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
currentDealInfo2: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: activeDeal,
|
||||
},
|
||||
matchStates: []matchState{
|
||||
{
|
||||
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
|
||||
SectorNumber: sectorNumber,
|
||||
}),
|
||||
deals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: successDeal,
|
||||
},
|
||||
// Exit-code 1 means the prove-commit was unsuccessful
|
||||
receipt: &types.MessageReceipt{ExitCode: 1},
|
||||
},
|
||||
},
|
||||
expectedCBCallCount: 0,
|
||||
},
|
||||
"error on deal in check": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{},
|
||||
searchMessageErr: errors.New("something went wrong"),
|
||||
"error getting current deal info in check func": {
|
||||
currentDealInfoErr: errors.New("something went wrong"),
|
||||
expectedCBCallCount: 0,
|
||||
expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"),
|
||||
expectedError: xerrors.Errorf("failed to set up called handler: failed to look up deal on chain: something went wrong"),
|
||||
},
|
||||
"sector start epoch > 0 in check": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: successDeal,
|
||||
"sector already active": {
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: activeDeal,
|
||||
},
|
||||
expectedCBCallCount: 1,
|
||||
},
|
||||
"error on deal in called": {
|
||||
searchMessageErr: errors.New("something went wrong"),
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
"sector was slashed": {
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: slashedDeal,
|
||||
},
|
||||
expectedCBCallCount: 0,
|
||||
expectedError: xerrors.Errorf("failed to set up called handler: deal %d was slashed at epoch %d", dealID, slashedDeal.State.SlashEpoch),
|
||||
},
|
||||
"error getting current deal info in called func": {
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
currentDealInfoErr2: errors.New("something went wrong"),
|
||||
matchStates: []matchState{
|
||||
{
|
||||
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
|
||||
SectorNumber: sectorNumber,
|
||||
}),
|
||||
},
|
||||
},
|
||||
expectedCBCallCount: 1,
|
||||
expectedCBError: xerrors.Errorf("handling applied event: failed to look up deal on chain: something went wrong"),
|
||||
},
|
||||
"proposed deal epoch timeout": {
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
dealStartEpochTimeout: true,
|
||||
expectedCBCallCount: 1,
|
||||
expectedCBError: xerrors.Errorf("handling applied event: deal with piece CID %s was not activated by proposed deal start epoch 0", unfinishedDeal.Proposal.PieceCID),
|
||||
},
|
||||
"got prove-commit but deal not active": {
|
||||
currentDealInfo: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
currentDealInfo2: sealing.CurrentDealInfo{
|
||||
DealID: dealID,
|
||||
MarketDeal: unfinishedDeal,
|
||||
},
|
||||
matchStates: []matchState{
|
||||
{
|
||||
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
|
||||
SectorNumber: sectorNumber,
|
||||
}),
|
||||
deals: map[abi.DealID]*api.MarketDeal{
|
||||
newDealID: successDeal,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedCBCallCount: 1,
|
||||
expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"),
|
||||
expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"),
|
||||
},
|
||||
"proposed deal epoch timeout": {
|
||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
||||
startDealID: unfinishedDeal,
|
||||
},
|
||||
dealStartEpochTimeout: true,
|
||||
expectedCBCallCount: 1,
|
||||
expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID),
|
||||
expectedCBError: xerrors.Errorf("handling applied event: deal wasn't active: deal=%d, parentState=bafkqaaa, h=5", dealID),
|
||||
},
|
||||
}
|
||||
runTestCase := func(testCase string, data testCase) {
|
||||
t.Run(testCase, func(t *testing.T) {
|
||||
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
// defer cancel()
|
||||
api := &mockGetCurrentDealInfoAPI{
|
||||
SearchMessageLookup: data.searchMessageLookup,
|
||||
SearchMessageErr: data.searchMessageErr,
|
||||
MarketDeals: make(map[marketDealKey]*api.MarketDeal),
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
checkTs, err := test.MockTipset(provider, rand.Uint64())
|
||||
require.NoError(t, err)
|
||||
for dealID, deal := range data.checkTsDeals {
|
||||
api.MarketDeals[marketDealKey{dealID, checkTs.Key()}] = deal
|
||||
}
|
||||
matchMessages := make([]matchMessage, len(data.matchStates))
|
||||
for i, ms := range data.matchStates {
|
||||
matchTs, err := test.MockTipset(provider, rand.Uint64())
|
||||
require.NoError(t, err)
|
||||
for dealID, deal := range ms.deals {
|
||||
api.MarketDeals[marketDealKey{dealID, matchTs.Key()}] = deal
|
||||
}
|
||||
matchMessages[i] = matchMessage{
|
||||
curH: 5,
|
||||
msg: ms.msg,
|
||||
@ -412,7 +431,15 @@ func TestOnDealSectorCommitted(t *testing.T) {
|
||||
cbCallCount++
|
||||
cbError = err
|
||||
}
|
||||
err = OnDealSectorCommitted(ctx, api, eventsAPI, provider, startDealID, sectorNumber, proposal, &publishCid, cb)
|
||||
mockPCAPI := &mockPreCommitsAPI{}
|
||||
mockDIAPI := &mockDealInfoAPI{
|
||||
CurrentDealInfo: data.currentDealInfo,
|
||||
CurrentDealInfo2: data.currentDealInfo2,
|
||||
Err: data.currentDealInfoErr,
|
||||
Err2: data.currentDealInfoErr2,
|
||||
}
|
||||
scm := newSectorCommittedManager(eventsAPI, mockDIAPI, mockPCAPI)
|
||||
err = scm.OnDealSectorCommitted(ctx, provider, sectorNumber, proposal, publishCid, cb)
|
||||
if data.expectedError == nil {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
@ -434,7 +461,6 @@ func TestOnDealSectorCommitted(t *testing.T) {
|
||||
type matchState struct {
|
||||
msg *types.Message
|
||||
receipt *types.MessageReceipt
|
||||
deals map[abi.DealID]*api.MarketDeal
|
||||
}
|
||||
|
||||
type matchMessage struct {
|
||||
@ -476,7 +502,8 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r
|
||||
}
|
||||
more, err := msgHnd(matchMessage.msg, receipt, matchMessage.ts, matchMessage.curH)
|
||||
if err != nil {
|
||||
return err
|
||||
// error is handled through a callback rather than being returned
|
||||
return nil
|
||||
}
|
||||
if matchMessage.doesRevert {
|
||||
err := rev(fe.Ctx, matchMessage.ts)
|
||||
@ -514,3 +541,32 @@ func generateCids(n int) []cid.Cid {
|
||||
}
|
||||
return cids
|
||||
}
|
||||
|
||||
type mockPreCommitsAPI struct {
|
||||
PCChanges *miner.PreCommitChanges
|
||||
Err error
|
||||
}
|
||||
|
||||
func (m *mockPreCommitsAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) {
|
||||
pcc := &miner.PreCommitChanges{}
|
||||
if m.PCChanges != nil {
|
||||
pcc = m.PCChanges
|
||||
}
|
||||
return pcc, m.Err
|
||||
}
|
||||
|
||||
type mockDealInfoAPI struct {
|
||||
count int
|
||||
CurrentDealInfo sealing.CurrentDealInfo
|
||||
Err error
|
||||
CurrentDealInfo2 sealing.CurrentDealInfo
|
||||
Err2 error
|
||||
}
|
||||
|
||||
func (m *mockDealInfoAPI) GetCurrentDealInfo(ctx context.Context, tok sealing.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, error) {
|
||||
m.count++
|
||||
if m.count == 2 {
|
||||
return m.CurrentDealInfo2, m.Err2
|
||||
}
|
||||
return m.CurrentDealInfo, m.Err
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||
@ -22,7 +23,6 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/events"
|
||||
@ -33,6 +33,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/markets/utils"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
)
|
||||
|
||||
@ -42,7 +43,6 @@ var log = logging.Logger("storageadapter")
|
||||
|
||||
type ProviderNodeAdapter struct {
|
||||
api.FullNode
|
||||
*apiWrapper
|
||||
|
||||
// this goes away with the data transfer module
|
||||
dag dtypes.StagingDAG
|
||||
@ -50,57 +50,38 @@ type ProviderNodeAdapter struct {
|
||||
secb *sectorblocks.SectorBlocks
|
||||
ev *events.Events
|
||||
|
||||
publishSpec, addBalanceSpec *api.MessageSendSpec
|
||||
dealPublisher *DealPublisher
|
||||
|
||||
addBalanceSpec *api.MessageSendSpec
|
||||
dsMatcher *dealStateMatcher
|
||||
scMgr *SectorCommittedManager
|
||||
}
|
||||
|
||||
func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
|
||||
return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
|
||||
func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
ev := events.NewEvents(ctx, full)
|
||||
na := &ProviderNodeAdapter{
|
||||
FullNode: full,
|
||||
apiWrapper: &apiWrapper{api: full},
|
||||
|
||||
dag: dag,
|
||||
secb: secb,
|
||||
ev: events.NewEvents(context.TODO(), full),
|
||||
ev: ev,
|
||||
dealPublisher: dealPublisher,
|
||||
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))),
|
||||
}
|
||||
if fc != nil {
|
||||
na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)}
|
||||
na.addBalanceSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxMarketBalanceAddFee)}
|
||||
}
|
||||
na.scMgr = NewSectorCommittedManager(ev, na, &apiWrapper{api: full})
|
||||
|
||||
return na
|
||||
}
|
||||
}
|
||||
|
||||
func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) {
|
||||
log.Info("publishing deal")
|
||||
|
||||
mi, err := n.StateMinerInfo(ctx, deal.Proposal.Provider, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{
|
||||
Deals: []market2.ClientDealProposal{deal.ClientDealProposal},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err)
|
||||
}
|
||||
|
||||
// TODO: We may want this to happen after fetching data
|
||||
smsg, err := n.MpoolPushMessage(ctx, &types.Message{
|
||||
To: market.Address,
|
||||
From: mi.Worker,
|
||||
Value: types.NewInt(0),
|
||||
Method: market.Methods.PublishStorageDeals,
|
||||
Params: params,
|
||||
}, n.publishSpec)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
return smsg.Cid(), nil
|
||||
return n.dealPublisher.Publish(ctx, deal.ClientDealProposal)
|
||||
}
|
||||
|
||||
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) {
|
||||
@ -280,12 +261,14 @@ func (n *ProviderNodeAdapter) DealProviderCollateralBounds(ctx context.Context,
|
||||
return bounds.Min, bounds.Max, nil
|
||||
}
|
||||
|
||||
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
|
||||
func (n *ProviderNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
|
||||
return OnDealSectorPreCommitted(ctx, n, n.ev, provider, dealID, market.DealProposal(proposal), publishCid, cb)
|
||||
return n.scMgr.OnDealSectorPreCommitted(ctx, provider, market.DealProposal(proposal), *publishCid, cb)
|
||||
}
|
||||
|
||||
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
|
||||
func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
|
||||
return OnDealSectorCommitted(ctx, n, n.ev, provider, dealID, sectorNumber, market.DealProposal(proposal), publishCid, cb)
|
||||
return n.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, market.DealProposal(proposal), *publishCid, cb)
|
||||
}
|
||||
|
||||
func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
|
||||
|
@ -377,6 +377,7 @@ func Online() Option {
|
||||
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
|
||||
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
|
||||
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
|
||||
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
|
||||
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
|
||||
Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds),
|
||||
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
||||
@ -519,6 +520,10 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))),
|
||||
),
|
||||
|
||||
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{
|
||||
Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod),
|
||||
MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg,
|
||||
})),
|
||||
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)),
|
||||
|
||||
Override(new(sectorstorage.SealerConfig), cfg.Storage),
|
||||
@ -646,5 +651,6 @@ func Test() Option {
|
||||
Unset(RunPeerMgrKey),
|
||||
Unset(new(*peermgr.PeerMgr)),
|
||||
Override(new(beacon.Schedule), testing.RandomBeacon),
|
||||
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
|
||||
)
|
||||
}
|
||||
|
@ -48,6 +48,12 @@ type DealmakingConfig struct {
|
||||
ConsiderUnverifiedStorageDeals bool
|
||||
PieceCidBlocklist []cid.Cid
|
||||
ExpectedSealDuration Duration
|
||||
// The amount of time to wait for more deals to arrive before
|
||||
// publishing
|
||||
PublishMsgPeriod Duration
|
||||
// The maximum number of deals to include in a single PublishStorageDeals
|
||||
// message
|
||||
MaxDealsPerPublishMsg uint64
|
||||
|
||||
Filter string
|
||||
RetrievalFilter string
|
||||
@ -209,6 +215,8 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
PieceCidBlocklist: []cid.Cid{},
|
||||
// TODO: It'd be nice to set this based on sector size
|
||||
ExpectedSealDuration: Duration(time.Hour * 24),
|
||||
PublishMsgPeriod: Duration(time.Hour),
|
||||
MaxDealsPerPublishMsg: 8,
|
||||
},
|
||||
|
||||
Fees: MinerFeeConfig{
|
||||
|
@ -54,8 +54,8 @@ type StateModuleAPI interface {
|
||||
StateMinerProvingDeadline(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error)
|
||||
StateMinerPower(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error)
|
||||
StateNetworkVersion(ctx context.Context, key types.TipSetKey) (network.Version, error)
|
||||
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
|
||||
StateSearchMsg(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
|
||||
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
|
||||
StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
|
||||
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
|
||||
}
|
||||
|
@ -5,14 +5,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
builder "github.com/filecoin-project/lotus/node/test"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/lib/lotuslog"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
"github.com/filecoin-project/lotus/api/test"
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
"github.com/filecoin-project/lotus/lib/lotuslog"
|
||||
builder "github.com/filecoin-project/lotus/node/test"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -57,8 +55,8 @@ func TestAPIDealFlow(t *testing.T) {
|
||||
t.Run("TestFastRetrievalDealFlow", func(t *testing.T) {
|
||||
test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
|
||||
})
|
||||
t.Run("TestZeroPricePerByteRetrievalDealFlow", func(t *testing.T) {
|
||||
test.TestZeroPricePerByteRetrievalDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
|
||||
t.Run("TestPublishDealsBatching", func(t *testing.T) {
|
||||
test.TestPublishDealsBatching(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -288,7 +288,11 @@ func mockBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.
|
||||
genMiner := maddrs[i]
|
||||
wa := genms[i].Worker
|
||||
|
||||
storers[i] = CreateTestStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options())
|
||||
opts := def.Opts
|
||||
if opts == nil {
|
||||
opts = node.Options()
|
||||
}
|
||||
storers[i] = CreateTestStorageNode(ctx, t, wa, genMiner, pk, f, mn, opts)
|
||||
if err := storers[i].StorageAddLocal(ctx, presealDirs[i]); err != nil {
|
||||
t.Fatalf("%+v", err)
|
||||
}
|
||||
@ -455,12 +459,17 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes
|
||||
}
|
||||
}
|
||||
|
||||
opts := def.Opts
|
||||
if opts == nil {
|
||||
opts = node.Options()
|
||||
}
|
||||
storers[i] = CreateTestStorageNode(ctx, t, genms[i].Worker, maddrs[i], pidKeys[i], f, mn, node.Options(
|
||||
node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) {
|
||||
return mock.NewMockSectorMgr(sectors), nil
|
||||
}),
|
||||
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
|
||||
node.Unset(new(*sectorstorage.Manager)),
|
||||
opts,
|
||||
))
|
||||
|
||||
if rpc {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
)
|
||||
|
||||
var log = logging.Logger("payment-channel-settler")
|
||||
@ -50,9 +51,10 @@ type paymentChannelSettler struct {
|
||||
|
||||
// SettlePaymentChannels checks the chain for events related to payment channels settling and
|
||||
// submits any vouchers for inbound channels tracked for this node
|
||||
func SettlePaymentChannels(lc fx.Lifecycle, api API) error {
|
||||
func SettlePaymentChannels(mctx helpers.MetricsCtx, lc fx.Lifecycle, api API) error {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
OnStart: func(context.Context) error {
|
||||
pcs := newPaymentChannelSettler(ctx, &api)
|
||||
ev := events.NewEvents(ctx, &api)
|
||||
return ev.Called(pcs.check, pcs.messageHandler, pcs.revertHandler, int(build.MessageConfidence+1), events.NoTimeout, pcs.matcher)
|
||||
|
Loading…
Reference in New Issue
Block a user