refactor: no need to check for prove-commit from OnDealSectorPreCommitted
This commit is contained in:
parent
47a4128311
commit
cd7580e6d8
@ -20,12 +20,7 @@ type sectorCommittedEventsAPI interface {
|
|||||||
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
|
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type sectorPreCommitAPI interface {
|
func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
|
||||||
getCurrentDealInfoAPI
|
|
||||||
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func OnDealSectorPreCommitted(ctx context.Context, api sectorPreCommitAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
|
|
||||||
// First check if the deal is already active, and if so, bail out
|
// First check if the deal is already active, and if so, bail out
|
||||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||||
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
|
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
|
||||||
@ -46,23 +41,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api sectorPreCommitAPI, event
|
|||||||
return false, true, nil
|
return false, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch for a pre-commit or prove-commit message to the provider.
|
// Watch for a pre-commit message to the provider.
|
||||||
// It's possible (when the node restarts) that the pre-commit has already
|
matchEvent := func(msg *types.Message) (bool, error) {
|
||||||
// been submitted, in which case we wait for the prove-commit message,
|
matched := msg.To == provider && msg.Method == miner.Methods.PreCommitSector
|
||||||
// so match both pre-commit and prove-commit messages.
|
return matched, nil
|
||||||
matchEvent := func(msg *types.Message) (matched bool, err error) {
|
|
||||||
if msg.To != provider {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
switch msg.Method {
|
|
||||||
case miner.Methods.PreCommitSector:
|
|
||||||
return true, nil
|
|
||||||
case miner.Methods.ProveCommitSector:
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the message params included the deal ID we're looking for.
|
// Check if the message params included the deal ID we're looking for.
|
||||||
@ -78,38 +60,11 @@ func OnDealSectorPreCommitted(ctx context.Context, api sectorPreCommitAPI, event
|
|||||||
return false, xerrors.Errorf("timed out waiting for deal %d pre-commit", dealID)
|
return false, xerrors.Errorf("timed out waiting for deal %d pre-commit", dealID)
|
||||||
}
|
}
|
||||||
|
|
||||||
var sectorNumber abi.SectorNumber
|
// Extract the message parameters
|
||||||
var msgDealIDs []abi.DealID
|
|
||||||
if msg.Method == miner.Methods.PreCommitSector {
|
|
||||||
// If it's a pre-commit message, the deal IDs are in the message parameters
|
|
||||||
var params miner.SectorPreCommitInfo
|
var params miner.SectorPreCommitInfo
|
||||||
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
|
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
|
||||||
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
|
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
|
||||||
}
|
}
|
||||||
msgDealIDs = params.DealIDs
|
|
||||||
sectorNumber = params.SectorNumber
|
|
||||||
} else {
|
|
||||||
// If it's a prove-commit message, the parameters don't have deal IDs,
|
|
||||||
// just a sector number
|
|
||||||
var params miner.ProveCommitSectorParams
|
|
||||||
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
|
|
||||||
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Look up the sector number in miner state to get the deal IDs
|
|
||||||
sectorNumber = params.SectorNumber
|
|
||||||
sectorInfo, err := api.StateSectorGetInfo(ctx, provider, sectorNumber, ts.Key())
|
|
||||||
if err != nil {
|
|
||||||
return false, xerrors.Errorf("failed to get sector info for sector %d: %w", sectorNumber, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there is no sector info for this sector, ignore it
|
|
||||||
if sectorInfo == nil {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
msgDealIDs = sectorInfo.DealIDs
|
|
||||||
}
|
|
||||||
|
|
||||||
// When the deal is published, the deal ID may change, so get the
|
// When the deal is published, the deal ID may change, so get the
|
||||||
// current deal ID from the publish message CID
|
// current deal ID from the publish message CID
|
||||||
@ -119,12 +74,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api sectorPreCommitAPI, event
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check through the deal IDs associated with this message
|
// Check through the deal IDs associated with this message
|
||||||
for _, did := range msgDealIDs {
|
for _, did := range params.DealIDs {
|
||||||
if did == dealID {
|
if did == dealID {
|
||||||
// Found the deal ID in this message. Callback with the sector ID.
|
// Found the deal ID in this message. Callback with the sector ID.
|
||||||
// If the message is a prove-commit, then the sector is already active.
|
cb(params.SectorNumber, false, nil)
|
||||||
isActive := msg.Method == miner.Methods.ProveCommitSector
|
|
||||||
cb(sectorNumber, isActive, nil)
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
|||||||
searchMessageErr error
|
searchMessageErr error
|
||||||
checkTsDeals map[abi.DealID]*api.MarketDeal
|
checkTsDeals map[abi.DealID]*api.MarketDeal
|
||||||
matchStates []matchState
|
matchStates []matchState
|
||||||
sectorInfo *miner.SectorOnChainInfo
|
|
||||||
expectedCBCallCount uint64
|
expectedCBCallCount uint64
|
||||||
expectedCBSectorNumber abi.SectorNumber
|
expectedCBSectorNumber abi.SectorNumber
|
||||||
expectedCBIsActive bool
|
expectedCBIsActive bool
|
||||||
@ -85,7 +84,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
|||||||
expectedCBIsActive: false,
|
expectedCBIsActive: false,
|
||||||
expectedCBSectorNumber: sectorNumber,
|
expectedCBSectorNumber: sectorNumber,
|
||||||
},
|
},
|
||||||
"pre-commit, deal id changes in called": {
|
"deal id changes in called": {
|
||||||
searchMessageLookup: &api.MsgLookup{
|
searchMessageLookup: &api.MsgLookup{
|
||||||
Receipt: types.MessageReceipt{
|
Receipt: types.MessageReceipt{
|
||||||
ExitCode: exitcode.Ok,
|
ExitCode: exitcode.Ok,
|
||||||
@ -111,49 +110,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
|||||||
expectedCBIsActive: false,
|
expectedCBIsActive: false,
|
||||||
expectedCBSectorNumber: sectorNumber,
|
expectedCBSectorNumber: sectorNumber,
|
||||||
},
|
},
|
||||||
"prove-commit, deal id changes in called": {
|
|
||||||
searchMessageLookup: &api.MsgLookup{
|
|
||||||
Receipt: types.MessageReceipt{
|
|
||||||
ExitCode: exitcode.Ok,
|
|
||||||
Return: newValueReturn,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
|
||||||
newDealID: unfinishedDeal,
|
|
||||||
},
|
|
||||||
matchStates: []matchState{
|
|
||||||
{
|
|
||||||
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
|
|
||||||
SectorNumber: sectorNumber,
|
|
||||||
}),
|
|
||||||
deals: map[abi.DealID]*api.MarketDeal{
|
|
||||||
newDealID: successDeal,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
sectorInfo: &miner.SectorOnChainInfo{
|
|
||||||
DealIDs: []abi.DealID{newDealID},
|
|
||||||
},
|
|
||||||
expectedCBCallCount: 1,
|
|
||||||
expectedCBIsActive: true,
|
|
||||||
expectedCBSectorNumber: sectorNumber,
|
|
||||||
},
|
|
||||||
"prove-commit but no sector info": {
|
|
||||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{
|
|
||||||
startDealID: unfinishedDeal,
|
|
||||||
},
|
|
||||||
matchStates: []matchState{
|
|
||||||
{
|
|
||||||
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
|
|
||||||
SectorNumber: sectorNumber,
|
|
||||||
}),
|
|
||||||
deals: map[abi.DealID]*api.MarketDeal{
|
|
||||||
startDealID: successDeal,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectedCBCallCount: 0,
|
|
||||||
},
|
|
||||||
"error on deal in check": {
|
"error on deal in check": {
|
||||||
checkTsDeals: map[abi.DealID]*api.MarketDeal{},
|
checkTsDeals: map[abi.DealID]*api.MarketDeal{},
|
||||||
searchMessageErr: errors.New("something went wrong"),
|
searchMessageErr: errors.New("something went wrong"),
|
||||||
@ -192,7 +148,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
|||||||
searchMessageErr error
|
searchMessageErr error
|
||||||
checkTsDeals map[abi.DealID]*api.MarketDeal
|
checkTsDeals map[abi.DealID]*api.MarketDeal
|
||||||
matchStates []matchState
|
matchStates []matchState
|
||||||
sectorInfo *miner.SectorOnChainInfo
|
|
||||||
expectedCBCallCount uint64
|
expectedCBCallCount uint64
|
||||||
expectedCBSectorNumber abi.SectorNumber
|
expectedCBSectorNumber abi.SectorNumber
|
||||||
expectedCBIsActive bool
|
expectedCBIsActive bool
|
||||||
@ -202,12 +157,10 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
|||||||
t.Run(testCase, func(t *testing.T) {
|
t.Run(testCase, func(t *testing.T) {
|
||||||
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
// defer cancel()
|
// defer cancel()
|
||||||
api := &mockSectorPreCommitAPI{
|
api := &mockGetCurrentDealInfoAPI{
|
||||||
mockGetCurrentDealInfoAPI: mockGetCurrentDealInfoAPI{
|
|
||||||
SearchMessageLookup: data.searchMessageLookup,
|
SearchMessageLookup: data.searchMessageLookup,
|
||||||
SearchMessageErr: data.searchMessageErr,
|
SearchMessageErr: data.searchMessageErr,
|
||||||
MarketDeals: make(map[marketDealKey]*api.MarketDeal),
|
MarketDeals: make(map[marketDealKey]*api.MarketDeal),
|
||||||
},
|
|
||||||
}
|
}
|
||||||
checkTs, err := test.MockTipset(provider, rand.Uint64())
|
checkTs, err := test.MockTipset(provider, rand.Uint64())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -228,7 +181,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
|||||||
ts: matchTs,
|
ts: matchTs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
api.SectorInfo = data.sectorInfo
|
|
||||||
eventsAPI := &fakeEvents{
|
eventsAPI := &fakeEvents{
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
CheckTs: checkTs,
|
CheckTs: checkTs,
|
||||||
@ -499,15 +451,6 @@ func makeMessage(t *testing.T, to address.Address, method abi.MethodNum, params
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockSectorPreCommitAPI struct {
|
|
||||||
mockGetCurrentDealInfoAPI
|
|
||||||
SectorInfo *miner.SectorOnChainInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mapi *mockSectorPreCommitAPI) StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) {
|
|
||||||
return mapi.SectorInfo, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var seq int
|
var seq int
|
||||||
|
|
||||||
func generateCids(n int) []cid.Cid {
|
func generateCids(n int) []cid.Cid {
|
||||||
|
Loading…
Reference in New Issue
Block a user