From 6bbb8ecafcdfa40481bf9034e4a041e084b2d12e Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 1 Dec 2020 12:35:11 +0100 Subject: [PATCH] refactor: simplify deal start epoch timeout handling --- .../storageadapter/ondealsectorcommitted.go | 88 ++++++------------- .../ondealsectorcommitted_test.go | 40 ++------- 2 files changed, 32 insertions(+), 96 deletions(-) diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 4e85dcdb4..4980c233c 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -8,7 +8,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -20,7 +19,6 @@ import ( type sectorCommittedEventsAPI interface { Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error - ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error } func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error { @@ -33,9 +31,8 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev } // First check if the deal is already active, and if so, bail out - var proposedDealStartEpoch abi.ChainEpoch checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorPreCommitted so no need to call the callback @@ -49,10 +46,6 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return true, false, nil } - // Save the proposed deal start epoch so we can timeout if the deal - // hasn't been activated by that epoch - proposedDealStartEpoch = deal.Proposal.StartEpoch - // Not yet active, start matching against incoming messages return false, true, nil } @@ -63,6 +56,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return matched, nil } + // The deal must be accepted by the deal proposal start epoch, so timeout + // if the chain reaches that epoch + timeoutEpoch := proposal.StartEpoch + 1 + // Check if the message params included the deal ID we're looking for. called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) { defer func() { @@ -71,9 +68,11 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev } }() - // Check if waiting for pre-commit timed out + // If the deal hasn't been activated by the proposed start epoch, the + // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - return false, xerrors.Errorf("timed out waiting for deal %d pre-commit", dealID) + err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + return false, err } // Extract the message parameters @@ -108,16 +107,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return nil } - if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil { + if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { return xerrors.Errorf("failed to set up called handler: %w", err) } - // If the deal hasn't been activated by the proposed start epoch, timeout - // the deal - timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) { - cb(0, false, err) - }) - return nil } @@ -131,9 +124,8 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } // First check if the deal is already active, and if so, bail out - var proposedDealStartEpoch abi.ChainEpoch checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorCommitted so no need to call the callback @@ -147,10 +139,6 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return true, false, nil } - // Save the proposed deal start epoch so we can timeout if the deal - // hasn't been activated by that epoch - proposedDealStartEpoch = deal.Proposal.StartEpoch - // Not yet active, start matching against incoming messages return false, true, nil } @@ -169,6 +157,10 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return params.SectorNumber == sectorNumber, nil } + // The deal must be accepted by the deal proposal start epoch, so timeout + // if the chain reaches that epoch + timeoutEpoch := proposal.StartEpoch + 1 + called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) { defer func() { if err != nil { @@ -176,9 +168,11 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } }() - // Check if waiting for prove-commit timed out + // If the deal hasn't been activated by the proposed start epoch, the + // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - return false, xerrors.Errorf("timed out waiting for deal activation for deal %d", dealID) + err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + return false, err } // Get the deal info @@ -205,61 +199,29 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return nil } - if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil { + if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { return xerrors.Errorf("failed to set up called handler: %w", err) } - // If the deal hasn't been activated by the proposed start epoch, timeout - // the deal - timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) { - cb(err) - }) - return nil } -func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (*api.MarketDeal, bool, error) { +func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) { _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { // TODO: This may be fine for some errors - return nil, false, xerrors.Errorf("failed to look up deal on chain: %w", err) + return false, xerrors.Errorf("failed to look up deal on chain: %w", err) } // Sector with deal is already active if sd.State.SectorStartEpoch > 0 { - return nil, true, nil + return true, nil } // Sector was slashed if sd.State.SlashEpoch > 0 { - return nil, false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) + return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) } - return sd, false, nil -} - -// Once the chain reaches the proposed deal start epoch, callback with an error. -// Note that the functions that call timeoutOnProposedStartEpoch will ignore -// the callback if it's already been called (ie if a pre-commit or commit -// message lands on chain before the proposed deal start epoch). -func timeoutOnProposedStartEpoch(dealID abi.DealID, proposedDealStartEpoch abi.ChainEpoch, api sectorCommittedEventsAPI, cb func(err error)) { - // Called when the chain height reaches deal start epoch + confidence - heightAt := func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { - cb(xerrors.Errorf("deal %d was not activated by deal start epoch %d", dealID, proposedDealStartEpoch)) - return nil - } - - // If the chain reorgs after reaching the deal start epoch, it's very - // unlikely to reorg in such a way that the deal changes from - // "not activated" to "activated before deal start epoch", so just log a - // warning. - revert := func(ctx context.Context, ts *types.TipSet) error { - log.Warnf("deal %d had reached start epoch %d but the chain reorged", dealID, proposedDealStartEpoch) - return nil - } - - err := api.ChainAt(heightAt, revert, int(build.MessageConfidence+1), proposedDealStartEpoch+1) - if err != nil { - cb(xerrors.Errorf("error waiting for deal %d to become activated: %w", dealID, err)) - } + return false, nil } diff --git a/markets/storageadapter/ondealsectorcommitted_test.go b/markets/storageadapter/ondealsectorcommitted_test.go index e746f6f99..30fbfea76 100644 --- a/markets/storageadapter/ondealsectorcommitted_test.go +++ b/markets/storageadapter/ondealsectorcommitted_test.go @@ -60,7 +60,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) { searchMessageErr error checkTsDeals map[abi.DealID]*api.MarketDeal matchStates []matchState - chainAtErr error dealStartEpochTimeout bool expectedCBCallCount uint64 expectedCBSectorNumber abi.SectorNumber @@ -147,21 +146,13 @@ func TestOnDealSectorPreCommitted(t *testing.T) { expectedCBError: errors.New("handling applied event: something went wrong"), expectedError: errors.New("failed to set up called handler: something went wrong"), }, - "chainAt error": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, - }, - chainAtErr: errors.New("chain at err"), - expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID), - }, "proposed deal epoch timeout": { checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, dealStartEpochTimeout: true, expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID), + expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), }, } runTestCase := func(testCase string, data testCase) { @@ -196,7 +187,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) { Ctx: ctx, CheckTs: checkTs, MatchMessages: matchMessages, - ChainAtErr: data.chainAtErr, DealStartEpochTimeout: data.dealStartEpochTimeout, } cbCallCount := uint64(0) @@ -263,7 +253,6 @@ func TestOnDealSectorCommitted(t *testing.T) { searchMessageErr error checkTsDeals map[abi.DealID]*api.MarketDeal matchStates []matchState - chainAtErr error dealStartEpochTimeout bool expectedCBCallCount uint64 expectedCBError error @@ -339,21 +328,13 @@ func TestOnDealSectorCommitted(t *testing.T) { expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"), expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), }, - "chainAt error": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, - }, - chainAtErr: errors.New("chain at err"), - expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID), - }, "proposed deal epoch timeout": { checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, dealStartEpochTimeout: true, expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID), + expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), }, } runTestCase := func(testCase string, data testCase) { @@ -388,7 +369,6 @@ func TestOnDealSectorCommitted(t *testing.T) { Ctx: ctx, CheckTs: checkTs, MatchMessages: matchMessages, - ChainAtErr: data.chainAtErr, DealStartEpochTimeout: data.dealStartEpochTimeout, } cbCallCount := uint64(0) @@ -432,11 +412,15 @@ type fakeEvents struct { Ctx context.Context CheckTs *types.TipSet MatchMessages []matchMessage - ChainAtErr error DealStartEpochTimeout bool } func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error { + if fe.DealStartEpochTimeout { + msgHnd(nil, nil, nil, 100) // nolint:errcheck + return nil + } + _, more, err := check(fe.CheckTs) if err != nil { return err @@ -468,16 +452,6 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r return nil } -func (fe *fakeEvents) ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error { - if fe.ChainAtErr != nil { - return fe.ChainAtErr - } - if fe.DealStartEpochTimeout { - _ = hnd(context.Background(), nil, abi.ChainEpoch(0)) - } - return nil -} - func makeMessage(t *testing.T, to address.Address, method abi.MethodNum, params cbor.Marshaler) *types.Message { buf := new(bytes.Buffer) err := params.MarshalCBOR(buf)