refactor: simplify deal start epoch timeout handling

This commit is contained in:
Dirk McCormick 2020-12-01 12:35:11 +01:00
parent e02a366419
commit 6bbb8ecafc
2 changed files with 32 additions and 96 deletions

View File

@ -8,7 +8,6 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -20,7 +19,6 @@ import (
type sectorCommittedEventsAPI interface { type sectorCommittedEventsAPI interface {
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error
} }
func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error { func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error {
@ -33,9 +31,8 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
} }
// First check if the deal is already active, and if so, bail out // First check if the deal is already active, and if so, bail out
var proposedDealStartEpoch abi.ChainEpoch
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil { if err != nil {
// Note: the error returned from here will end up being returned // Note: the error returned from here will end up being returned
// from OnDealSectorPreCommitted so no need to call the callback // from OnDealSectorPreCommitted so no need to call the callback
@ -49,10 +46,6 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return true, false, nil return true, false, nil
} }
// Save the proposed deal start epoch so we can timeout if the deal
// hasn't been activated by that epoch
proposedDealStartEpoch = deal.Proposal.StartEpoch
// Not yet active, start matching against incoming messages // Not yet active, start matching against incoming messages
return false, true, nil return false, true, nil
} }
@ -63,6 +56,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return matched, nil return matched, nil
} }
// The deal must be accepted by the deal proposal start epoch, so timeout
// if the chain reaches that epoch
timeoutEpoch := proposal.StartEpoch + 1
// Check if the message params included the deal ID we're looking for. // Check if the message params included the deal ID we're looking for.
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) { called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
defer func() { defer func() {
@ -71,9 +68,11 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
} }
}() }()
// Check if waiting for pre-commit timed out // If the deal hasn't been activated by the proposed start epoch, the
// deal will timeout (when msg == nil it means the timeout epoch was reached)
if msg == nil { if msg == nil {
return false, xerrors.Errorf("timed out waiting for deal %d pre-commit", dealID) err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
return false, err
} }
// Extract the message parameters // Extract the message parameters
@ -108,16 +107,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return nil return nil
} }
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil { if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err) return xerrors.Errorf("failed to set up called handler: %w", err)
} }
// If the deal hasn't been activated by the proposed start epoch, timeout
// the deal
timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) {
cb(0, false, err)
})
return nil return nil
} }
@ -131,9 +124,8 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
} }
// First check if the deal is already active, and if so, bail out // First check if the deal is already active, and if so, bail out
var proposedDealStartEpoch abi.ChainEpoch
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil { if err != nil {
// Note: the error returned from here will end up being returned // Note: the error returned from here will end up being returned
// from OnDealSectorCommitted so no need to call the callback // from OnDealSectorCommitted so no need to call the callback
@ -147,10 +139,6 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return true, false, nil return true, false, nil
} }
// Save the proposed deal start epoch so we can timeout if the deal
// hasn't been activated by that epoch
proposedDealStartEpoch = deal.Proposal.StartEpoch
// Not yet active, start matching against incoming messages // Not yet active, start matching against incoming messages
return false, true, nil return false, true, nil
} }
@ -169,6 +157,10 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return params.SectorNumber == sectorNumber, nil return params.SectorNumber == sectorNumber, nil
} }
// The deal must be accepted by the deal proposal start epoch, so timeout
// if the chain reaches that epoch
timeoutEpoch := proposal.StartEpoch + 1
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) { called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
@ -176,9 +168,11 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
} }
}() }()
// Check if waiting for prove-commit timed out // If the deal hasn't been activated by the proposed start epoch, the
// deal will timeout (when msg == nil it means the timeout epoch was reached)
if msg == nil { if msg == nil {
return false, xerrors.Errorf("timed out waiting for deal activation for deal %d", dealID) err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
return false, err
} }
// Get the deal info // Get the deal info
@ -205,61 +199,29 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return nil return nil
} }
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil { if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err) return xerrors.Errorf("failed to set up called handler: %w", err)
} }
// If the deal hasn't been activated by the proposed start epoch, timeout
// the deal
timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) {
cb(err)
})
return nil return nil
} }
func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (*api.MarketDeal, bool, error) { func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) {
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil { if err != nil {
// TODO: This may be fine for some errors // TODO: This may be fine for some errors
return nil, false, xerrors.Errorf("failed to look up deal on chain: %w", err) return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
} }
// Sector with deal is already active // Sector with deal is already active
if sd.State.SectorStartEpoch > 0 { if sd.State.SectorStartEpoch > 0 {
return nil, true, nil return true, nil
} }
// Sector was slashed // Sector was slashed
if sd.State.SlashEpoch > 0 { if sd.State.SlashEpoch > 0 {
return nil, false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
} }
return sd, false, nil return false, nil
}
// Once the chain reaches the proposed deal start epoch, callback with an error.
// Note that the functions that call timeoutOnProposedStartEpoch will ignore
// the callback if it's already been called (ie if a pre-commit or commit
// message lands on chain before the proposed deal start epoch).
func timeoutOnProposedStartEpoch(dealID abi.DealID, proposedDealStartEpoch abi.ChainEpoch, api sectorCommittedEventsAPI, cb func(err error)) {
// Called when the chain height reaches deal start epoch + confidence
heightAt := func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
cb(xerrors.Errorf("deal %d was not activated by deal start epoch %d", dealID, proposedDealStartEpoch))
return nil
}
// If the chain reorgs after reaching the deal start epoch, it's very
// unlikely to reorg in such a way that the deal changes from
// "not activated" to "activated before deal start epoch", so just log a
// warning.
revert := func(ctx context.Context, ts *types.TipSet) error {
log.Warnf("deal %d had reached start epoch %d but the chain reorged", dealID, proposedDealStartEpoch)
return nil
}
err := api.ChainAt(heightAt, revert, int(build.MessageConfidence+1), proposedDealStartEpoch+1)
if err != nil {
cb(xerrors.Errorf("error waiting for deal %d to become activated: %w", dealID, err))
}
} }

View File

@ -60,7 +60,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
searchMessageErr error searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState matchStates []matchState
chainAtErr error
dealStartEpochTimeout bool dealStartEpochTimeout bool
expectedCBCallCount uint64 expectedCBCallCount uint64
expectedCBSectorNumber abi.SectorNumber expectedCBSectorNumber abi.SectorNumber
@ -147,21 +146,13 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
expectedCBError: errors.New("handling applied event: something went wrong"), expectedCBError: errors.New("handling applied event: something went wrong"),
expectedError: errors.New("failed to set up called handler: something went wrong"), expectedError: errors.New("failed to set up called handler: something went wrong"),
}, },
"chainAt error": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
chainAtErr: errors.New("chain at err"),
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID),
},
"proposed deal epoch timeout": { "proposed deal epoch timeout": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{ checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal, startDealID: unfinishedDeal,
}, },
dealStartEpochTimeout: true, dealStartEpochTimeout: true,
expectedCBCallCount: 1, expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID), expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID),
}, },
} }
runTestCase := func(testCase string, data testCase) { runTestCase := func(testCase string, data testCase) {
@ -196,7 +187,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
Ctx: ctx, Ctx: ctx,
CheckTs: checkTs, CheckTs: checkTs,
MatchMessages: matchMessages, MatchMessages: matchMessages,
ChainAtErr: data.chainAtErr,
DealStartEpochTimeout: data.dealStartEpochTimeout, DealStartEpochTimeout: data.dealStartEpochTimeout,
} }
cbCallCount := uint64(0) cbCallCount := uint64(0)
@ -263,7 +253,6 @@ func TestOnDealSectorCommitted(t *testing.T) {
searchMessageErr error searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState matchStates []matchState
chainAtErr error
dealStartEpochTimeout bool dealStartEpochTimeout bool
expectedCBCallCount uint64 expectedCBCallCount uint64
expectedCBError error expectedCBError error
@ -339,21 +328,13 @@ func TestOnDealSectorCommitted(t *testing.T) {
expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"), expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"),
expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"),
}, },
"chainAt error": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
chainAtErr: errors.New("chain at err"),
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID),
},
"proposed deal epoch timeout": { "proposed deal epoch timeout": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{ checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal, startDealID: unfinishedDeal,
}, },
dealStartEpochTimeout: true, dealStartEpochTimeout: true,
expectedCBCallCount: 1, expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID), expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID),
}, },
} }
runTestCase := func(testCase string, data testCase) { runTestCase := func(testCase string, data testCase) {
@ -388,7 +369,6 @@ func TestOnDealSectorCommitted(t *testing.T) {
Ctx: ctx, Ctx: ctx,
CheckTs: checkTs, CheckTs: checkTs,
MatchMessages: matchMessages, MatchMessages: matchMessages,
ChainAtErr: data.chainAtErr,
DealStartEpochTimeout: data.dealStartEpochTimeout, DealStartEpochTimeout: data.dealStartEpochTimeout,
} }
cbCallCount := uint64(0) cbCallCount := uint64(0)
@ -432,11 +412,15 @@ type fakeEvents struct {
Ctx context.Context Ctx context.Context
CheckTs *types.TipSet CheckTs *types.TipSet
MatchMessages []matchMessage MatchMessages []matchMessage
ChainAtErr error
DealStartEpochTimeout bool DealStartEpochTimeout bool
} }
func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error { func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error {
if fe.DealStartEpochTimeout {
msgHnd(nil, nil, nil, 100) // nolint:errcheck
return nil
}
_, more, err := check(fe.CheckTs) _, more, err := check(fe.CheckTs)
if err != nil { if err != nil {
return err return err
@ -468,16 +452,6 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r
return nil return nil
} }
func (fe *fakeEvents) ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error {
if fe.ChainAtErr != nil {
return fe.ChainAtErr
}
if fe.DealStartEpochTimeout {
_ = hnd(context.Background(), nil, abi.ChainEpoch(0))
}
return nil
}
func makeMessage(t *testing.T, to address.Address, method abi.MethodNum, params cbor.Marshaler) *types.Message { func makeMessage(t *testing.T, to address.Address, method abi.MethodNum, params cbor.Marshaler) *types.Message {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
err := params.MarshalCBOR(buf) err := params.MarshalCBOR(buf)