From e02a36641986b98ac05c9b1957833f0e5b158d6a Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 30 Nov 2020 15:16:13 +0100 Subject: [PATCH 1/5] feat: error out deals that are not activated by proposed deal start epoch --- chain/stmgr/utils.go | 6 +- .../storageadapter/ondealsectorcommitted.go | 91 ++++++++++++-- .../ondealsectorcommitted_test.go | 118 ++++++++++++------ 3 files changed, 166 insertions(+), 49 deletions(-) diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index fb0b91378..1e29e72d8 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -300,7 +300,11 @@ func GetStorageDeal(ctx context.Context, sm *StateManager, dealID abi.DealID, ts if err != nil { return nil, err } else if !found { - return nil, xerrors.Errorf("deal %d not found", dealID) + return nil, xerrors.Errorf( + "deal %d not found "+ + "- deal may not have completed sealing before deal proposal "+ + "start epoch, or deal may have been slashed", + dealID) } states, err := state.States() diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 85cc8f97b..4e85dcdb4 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -3,10 +3,12 @@ package storageadapter import ( "bytes" "context" + "sync" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -18,12 +20,22 @@ import ( type sectorCommittedEventsAPI interface { Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error + ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error } -func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error { +func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error { + // Ensure callback is only called once + var once sync.Once + cb := func(sectorNumber abi.SectorNumber, isActive bool, err error) { + once.Do(func() { + callback(sectorNumber, isActive, err) + }) + } + // First check if the deal is already active, and if so, bail out + var proposedDealStartEpoch abi.ChainEpoch checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorPreCommitted so no need to call the callback @@ -37,6 +49,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return true, false, nil } + // Save the proposed deal start epoch so we can timeout if the deal + // hasn't been activated by that epoch + proposedDealStartEpoch = deal.Proposal.StartEpoch + // Not yet active, start matching against incoming messages return false, true, nil } @@ -96,13 +112,28 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return xerrors.Errorf("failed to set up called handler: %w", err) } + // If the deal hasn't been activated by the proposed start epoch, timeout + // the deal + timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) { + cb(0, false, err) + }) + return nil } -func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error { +func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error { + // Ensure callback is only called once + var once sync.Once + cb := func(err error) { + once.Do(func() { + callback(err) + }) + } + // First check if the deal is already active, and if so, bail out + var proposedDealStartEpoch abi.ChainEpoch checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorCommitted so no need to call the callback @@ -116,6 +147,10 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return true, false, nil } + // Save the proposed deal start epoch so we can timeout if the deal + // hasn't been activated by that epoch + proposedDealStartEpoch = deal.Proposal.StartEpoch + // Not yet active, start matching against incoming messages return false, true, nil } @@ -174,17 +209,57 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return xerrors.Errorf("failed to set up called handler: %w", err) } + // If the deal hasn't been activated by the proposed start epoch, timeout + // the deal + timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) { + cb(err) + }) + return nil } -func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) { +func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (*api.MarketDeal, bool, error) { _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { // TODO: This may be fine for some errors - return false, xerrors.Errorf("failed to look up deal on chain: %w", err) + return nil, false, xerrors.Errorf("failed to look up deal on chain: %w", err) } // Sector with deal is already active - isActive := sd.State.SectorStartEpoch > 0 - return isActive, nil + if sd.State.SectorStartEpoch > 0 { + return nil, true, nil + } + + // Sector was slashed + if sd.State.SlashEpoch > 0 { + return nil, false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) + } + + return sd, false, nil +} + +// Once the chain reaches the proposed deal start epoch, callback with an error. +// Note that the functions that call timeoutOnProposedStartEpoch will ignore +// the callback if it's already been called (ie if a pre-commit or commit +// message lands on chain before the proposed deal start epoch). +func timeoutOnProposedStartEpoch(dealID abi.DealID, proposedDealStartEpoch abi.ChainEpoch, api sectorCommittedEventsAPI, cb func(err error)) { + // Called when the chain height reaches deal start epoch + confidence + heightAt := func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { + cb(xerrors.Errorf("deal %d was not activated by deal start epoch %d", dealID, proposedDealStartEpoch)) + return nil + } + + // If the chain reorgs after reaching the deal start epoch, it's very + // unlikely to reorg in such a way that the deal changes from + // "not activated" to "activated before deal start epoch", so just log a + // warning. + revert := func(ctx context.Context, ts *types.TipSet) error { + log.Warnf("deal %d had reached start epoch %d but the chain reorged", dealID, proposedDealStartEpoch) + return nil + } + + err := api.ChainAt(heightAt, revert, int(build.MessageConfidence+1), proposedDealStartEpoch+1) + if err != nil { + cb(xerrors.Errorf("error waiting for deal %d to become activated: %w", dealID, err)) + } } diff --git a/markets/storageadapter/ondealsectorcommitted_test.go b/markets/storageadapter/ondealsectorcommitted_test.go index 0ffc2ff3e..e746f6f99 100644 --- a/markets/storageadapter/ondealsectorcommitted_test.go +++ b/markets/storageadapter/ondealsectorcommitted_test.go @@ -8,6 +8,8 @@ import ( "math/rand" "testing" + "golang.org/x/xerrors" + blocks "github.com/ipfs/go-block-format" "github.com/filecoin-project/go-address" @@ -53,17 +55,20 @@ func TestOnDealSectorPreCommitted(t *testing.T) { LastUpdatedEpoch: 2, }, } - testCases := map[string]struct { + type testCase struct { searchMessageLookup *api.MsgLookup searchMessageErr error checkTsDeals map[abi.DealID]*api.MarketDeal matchStates []matchState + chainAtErr error + dealStartEpochTimeout bool expectedCBCallCount uint64 expectedCBSectorNumber abi.SectorNumber expectedCBIsActive bool expectedCBError error expectedError error - }{ + } + testCases := map[string]testCase{ "normal sequence": { checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, @@ -142,18 +147,24 @@ func TestOnDealSectorPreCommitted(t *testing.T) { expectedCBError: errors.New("handling applied event: something went wrong"), expectedError: errors.New("failed to set up called handler: something went wrong"), }, + "chainAt error": { + checkTsDeals: map[abi.DealID]*api.MarketDeal{ + startDealID: unfinishedDeal, + }, + chainAtErr: errors.New("chain at err"), + expectedCBCallCount: 1, + expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID), + }, + "proposed deal epoch timeout": { + checkTsDeals: map[abi.DealID]*api.MarketDeal{ + startDealID: unfinishedDeal, + }, + dealStartEpochTimeout: true, + expectedCBCallCount: 1, + expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID), + }, } - runTestCase := func(testCase string, data struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - checkTsDeals map[abi.DealID]*api.MarketDeal - matchStates []matchState - expectedCBCallCount uint64 - expectedCBSectorNumber abi.SectorNumber - expectedCBIsActive bool - expectedCBError error - expectedError error - }) { + runTestCase := func(testCase string, data testCase) { t.Run(testCase, func(t *testing.T) { // ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // defer cancel() @@ -182,9 +193,11 @@ func TestOnDealSectorPreCommitted(t *testing.T) { } } eventsAPI := &fakeEvents{ - Ctx: ctx, - CheckTs: checkTs, - MatchMessages: matchMessages, + Ctx: ctx, + CheckTs: checkTs, + MatchMessages: matchMessages, + ChainAtErr: data.chainAtErr, + DealStartEpochTimeout: data.dealStartEpochTimeout, } cbCallCount := uint64(0) var cbSectorNumber abi.SectorNumber @@ -245,15 +258,18 @@ func TestOnDealSectorCommitted(t *testing.T) { LastUpdatedEpoch: 2, }, } - testCases := map[string]struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - checkTsDeals map[abi.DealID]*api.MarketDeal - matchStates []matchState - expectedCBCallCount uint64 - expectedCBError error - expectedError error - }{ + type testCase struct { + searchMessageLookup *api.MsgLookup + searchMessageErr error + checkTsDeals map[abi.DealID]*api.MarketDeal + matchStates []matchState + chainAtErr error + dealStartEpochTimeout bool + expectedCBCallCount uint64 + expectedCBError error + expectedError error + } + testCases := map[string]testCase{ "normal sequence": { checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, @@ -323,16 +339,24 @@ func TestOnDealSectorCommitted(t *testing.T) { expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"), expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), }, + "chainAt error": { + checkTsDeals: map[abi.DealID]*api.MarketDeal{ + startDealID: unfinishedDeal, + }, + chainAtErr: errors.New("chain at err"), + expectedCBCallCount: 1, + expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID), + }, + "proposed deal epoch timeout": { + checkTsDeals: map[abi.DealID]*api.MarketDeal{ + startDealID: unfinishedDeal, + }, + dealStartEpochTimeout: true, + expectedCBCallCount: 1, + expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID), + }, } - runTestCase := func(testCase string, data struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - checkTsDeals map[abi.DealID]*api.MarketDeal - matchStates []matchState - expectedCBCallCount uint64 - expectedCBError error - expectedError error - }) { + runTestCase := func(testCase string, data testCase) { t.Run(testCase, func(t *testing.T) { // ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // defer cancel() @@ -361,9 +385,11 @@ func TestOnDealSectorCommitted(t *testing.T) { } } eventsAPI := &fakeEvents{ - Ctx: ctx, - CheckTs: checkTs, - MatchMessages: matchMessages, + Ctx: ctx, + CheckTs: checkTs, + MatchMessages: matchMessages, + ChainAtErr: data.chainAtErr, + DealStartEpochTimeout: data.dealStartEpochTimeout, } cbCallCount := uint64(0) var cbError error @@ -403,9 +429,11 @@ type matchMessage struct { doesRevert bool } type fakeEvents struct { - Ctx context.Context - CheckTs *types.TipSet - MatchMessages []matchMessage + Ctx context.Context + CheckTs *types.TipSet + MatchMessages []matchMessage + ChainAtErr error + DealStartEpochTimeout bool } func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error { @@ -440,6 +468,16 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r return nil } +func (fe *fakeEvents) ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error { + if fe.ChainAtErr != nil { + return fe.ChainAtErr + } + if fe.DealStartEpochTimeout { + _ = hnd(context.Background(), nil, abi.ChainEpoch(0)) + } + return nil +} + func makeMessage(t *testing.T, to address.Address, method abi.MethodNum, params cbor.Marshaler) *types.Message { buf := new(bytes.Buffer) err := params.MarshalCBOR(buf) From 6bbb8ecafcdfa40481bf9034e4a041e084b2d12e Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 1 Dec 2020 12:35:11 +0100 Subject: [PATCH 2/5] refactor: simplify deal start epoch timeout handling --- .../storageadapter/ondealsectorcommitted.go | 88 ++++++------------- .../ondealsectorcommitted_test.go | 40 ++------- 2 files changed, 32 insertions(+), 96 deletions(-) diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 4e85dcdb4..4980c233c 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -8,7 +8,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -20,7 +19,6 @@ import ( type sectorCommittedEventsAPI interface { Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error - ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error } func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error { @@ -33,9 +31,8 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev } // First check if the deal is already active, and if so, bail out - var proposedDealStartEpoch abi.ChainEpoch checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorPreCommitted so no need to call the callback @@ -49,10 +46,6 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return true, false, nil } - // Save the proposed deal start epoch so we can timeout if the deal - // hasn't been activated by that epoch - proposedDealStartEpoch = deal.Proposal.StartEpoch - // Not yet active, start matching against incoming messages return false, true, nil } @@ -63,6 +56,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return matched, nil } + // The deal must be accepted by the deal proposal start epoch, so timeout + // if the chain reaches that epoch + timeoutEpoch := proposal.StartEpoch + 1 + // Check if the message params included the deal ID we're looking for. called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) { defer func() { @@ -71,9 +68,11 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev } }() - // Check if waiting for pre-commit timed out + // If the deal hasn't been activated by the proposed start epoch, the + // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - return false, xerrors.Errorf("timed out waiting for deal %d pre-commit", dealID) + err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + return false, err } // Extract the message parameters @@ -108,16 +107,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return nil } - if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil { + if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { return xerrors.Errorf("failed to set up called handler: %w", err) } - // If the deal hasn't been activated by the proposed start epoch, timeout - // the deal - timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) { - cb(0, false, err) - }) - return nil } @@ -131,9 +124,8 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } // First check if the deal is already active, and if so, bail out - var proposedDealStartEpoch abi.ChainEpoch checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorCommitted so no need to call the callback @@ -147,10 +139,6 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return true, false, nil } - // Save the proposed deal start epoch so we can timeout if the deal - // hasn't been activated by that epoch - proposedDealStartEpoch = deal.Proposal.StartEpoch - // Not yet active, start matching against incoming messages return false, true, nil } @@ -169,6 +157,10 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return params.SectorNumber == sectorNumber, nil } + // The deal must be accepted by the deal proposal start epoch, so timeout + // if the chain reaches that epoch + timeoutEpoch := proposal.StartEpoch + 1 + called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) { defer func() { if err != nil { @@ -176,9 +168,11 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } }() - // Check if waiting for prove-commit timed out + // If the deal hasn't been activated by the proposed start epoch, the + // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - return false, xerrors.Errorf("timed out waiting for deal activation for deal %d", dealID) + err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + return false, err } // Get the deal info @@ -205,61 +199,29 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return nil } - if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil { + if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { return xerrors.Errorf("failed to set up called handler: %w", err) } - // If the deal hasn't been activated by the proposed start epoch, timeout - // the deal - timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) { - cb(err) - }) - return nil } -func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (*api.MarketDeal, bool, error) { +func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) { _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { // TODO: This may be fine for some errors - return nil, false, xerrors.Errorf("failed to look up deal on chain: %w", err) + return false, xerrors.Errorf("failed to look up deal on chain: %w", err) } // Sector with deal is already active if sd.State.SectorStartEpoch > 0 { - return nil, true, nil + return true, nil } // Sector was slashed if sd.State.SlashEpoch > 0 { - return nil, false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) + return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) } - return sd, false, nil -} - -// Once the chain reaches the proposed deal start epoch, callback with an error. -// Note that the functions that call timeoutOnProposedStartEpoch will ignore -// the callback if it's already been called (ie if a pre-commit or commit -// message lands on chain before the proposed deal start epoch). -func timeoutOnProposedStartEpoch(dealID abi.DealID, proposedDealStartEpoch abi.ChainEpoch, api sectorCommittedEventsAPI, cb func(err error)) { - // Called when the chain height reaches deal start epoch + confidence - heightAt := func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { - cb(xerrors.Errorf("deal %d was not activated by deal start epoch %d", dealID, proposedDealStartEpoch)) - return nil - } - - // If the chain reorgs after reaching the deal start epoch, it's very - // unlikely to reorg in such a way that the deal changes from - // "not activated" to "activated before deal start epoch", so just log a - // warning. - revert := func(ctx context.Context, ts *types.TipSet) error { - log.Warnf("deal %d had reached start epoch %d but the chain reorged", dealID, proposedDealStartEpoch) - return nil - } - - err := api.ChainAt(heightAt, revert, int(build.MessageConfidence+1), proposedDealStartEpoch+1) - if err != nil { - cb(xerrors.Errorf("error waiting for deal %d to become activated: %w", dealID, err)) - } + return false, nil } diff --git a/markets/storageadapter/ondealsectorcommitted_test.go b/markets/storageadapter/ondealsectorcommitted_test.go index e746f6f99..30fbfea76 100644 --- a/markets/storageadapter/ondealsectorcommitted_test.go +++ b/markets/storageadapter/ondealsectorcommitted_test.go @@ -60,7 +60,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) { searchMessageErr error checkTsDeals map[abi.DealID]*api.MarketDeal matchStates []matchState - chainAtErr error dealStartEpochTimeout bool expectedCBCallCount uint64 expectedCBSectorNumber abi.SectorNumber @@ -147,21 +146,13 @@ func TestOnDealSectorPreCommitted(t *testing.T) { expectedCBError: errors.New("handling applied event: something went wrong"), expectedError: errors.New("failed to set up called handler: something went wrong"), }, - "chainAt error": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, - }, - chainAtErr: errors.New("chain at err"), - expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID), - }, "proposed deal epoch timeout": { checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, dealStartEpochTimeout: true, expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID), + expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), }, } runTestCase := func(testCase string, data testCase) { @@ -196,7 +187,6 @@ func TestOnDealSectorPreCommitted(t *testing.T) { Ctx: ctx, CheckTs: checkTs, MatchMessages: matchMessages, - ChainAtErr: data.chainAtErr, DealStartEpochTimeout: data.dealStartEpochTimeout, } cbCallCount := uint64(0) @@ -263,7 +253,6 @@ func TestOnDealSectorCommitted(t *testing.T) { searchMessageErr error checkTsDeals map[abi.DealID]*api.MarketDeal matchStates []matchState - chainAtErr error dealStartEpochTimeout bool expectedCBCallCount uint64 expectedCBError error @@ -339,21 +328,13 @@ func TestOnDealSectorCommitted(t *testing.T) { expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"), expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), }, - "chainAt error": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, - }, - chainAtErr: errors.New("chain at err"), - expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID), - }, "proposed deal epoch timeout": { checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, dealStartEpochTimeout: true, expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID), + expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), }, } runTestCase := func(testCase string, data testCase) { @@ -388,7 +369,6 @@ func TestOnDealSectorCommitted(t *testing.T) { Ctx: ctx, CheckTs: checkTs, MatchMessages: matchMessages, - ChainAtErr: data.chainAtErr, DealStartEpochTimeout: data.dealStartEpochTimeout, } cbCallCount := uint64(0) @@ -432,11 +412,15 @@ type fakeEvents struct { Ctx context.Context CheckTs *types.TipSet MatchMessages []matchMessage - ChainAtErr error DealStartEpochTimeout bool } func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error { + if fe.DealStartEpochTimeout { + msgHnd(nil, nil, nil, 100) // nolint:errcheck + return nil + } + _, more, err := check(fe.CheckTs) if err != nil { return err @@ -468,16 +452,6 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r return nil } -func (fe *fakeEvents) ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error { - if fe.ChainAtErr != nil { - return fe.ChainAtErr - } - if fe.DealStartEpochTimeout { - _ = hnd(context.Background(), nil, abi.ChainEpoch(0)) - } - return nil -} - func makeMessage(t *testing.T, to address.Address, method abi.MethodNum, params cbor.Marshaler) *types.Message { buf := new(bytes.Buffer) err := params.MarshalCBOR(buf) From cb044f83a79952d1b0389960294ad874314a993d Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 1 Dec 2020 14:00:28 +0100 Subject: [PATCH 3/5] fix: ensure deal start is far enough in future for tests --- api/test/ccupgrade.go | 2 +- api/test/deals.go | 29 ++++++++++++++----------- api/test/mining.go | 2 +- cmd/lotus-gateway/endtoend_test.go | 6 ++++- cmd/lotus-storage-miner/allinfo_test.go | 2 +- node/node_test.go | 21 ++++++++++++------ 6 files changed, 38 insertions(+), 24 deletions(-) diff --git a/api/test/ccupgrade.go b/api/test/ccupgrade.go index 4f6b39701..eedcec6ca 100644 --- a/api/test/ccupgrade.go +++ b/api/test/ccupgrade.go @@ -89,7 +89,7 @@ func testCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, upgradeH t.Fatal(err) } - MakeDeal(t, ctx, 6, client, miner, false, false) + MakeDeal(t, ctx, 6, client, miner, false, false, 0) // Validate upgrade diff --git a/api/test/deals.go b/api/test/deals.go index d2cabadc6..1189f070e 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/filecoin-project/go-state-types/abi" + "github.com/stretchr/testify/require" "github.com/ipfs/go-cid" @@ -31,7 +33,7 @@ import ( ipld "github.com/ipfs/go-ipld-format" ) -func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool) { +func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool, startEpoch abi.ChainEpoch) { ctx := context.Background() n, sn := b(t, OneFull, OneMiner) @@ -60,14 +62,14 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport } }() - MakeDeal(t, ctx, 6, client, miner, carExport, fastRet) + MakeDeal(t, ctx, 6, client, miner, carExport, fastRet, startEpoch) atomic.AddInt64(&mine, -1) fmt.Println("shutting down mining") <-done } -func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { +func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { ctx := context.Background() n, sn := b(t, OneFull, OneMiner) @@ -97,15 +99,15 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { } }() - MakeDeal(t, ctx, 6, client, miner, false, false) - MakeDeal(t, ctx, 7, client, miner, false, false) + MakeDeal(t, ctx, 6, client, miner, false, false, startEpoch) + MakeDeal(t, ctx, 7, client, miner, false, false, startEpoch) atomic.AddInt64(&mine, -1) fmt.Println("shutting down mining") <-done } -func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool) { +func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) { res, data, err := CreateClientFile(ctx, client, rseed) if err != nil { t.Fatal(err) @@ -114,7 +116,7 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, fcid := res.Root fmt.Println("FILE CID: ", fcid) - deal := startDeal(t, ctx, miner, client, fcid, fastRet) + deal := startDeal(t, ctx, miner, client, fcid, fastRet, startEpoch) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) @@ -149,7 +151,7 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api return res, data, nil } -func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { +func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { ctx := context.Background() n, sn := b(t, OneFull, OneMiner) @@ -189,7 +191,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati fmt.Println("FILE CID: ", fcid) - deal := startDeal(t, ctx, miner, client, fcid, true) + deal := startDeal(t, ctx, miner, client, fcid, true, startEpoch) waitDealPublished(t, ctx, miner, deal) fmt.Println("deal published, retrieving") @@ -203,7 +205,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati <-done } -func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) { +func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) { ctx := context.Background() n, sn := b(t, OneFull, OneMiner) @@ -252,13 +254,13 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration t.Fatal(err) } - deal1 := startDeal(t, ctx, miner, client, fcid1, true) + deal1 := startDeal(t, ctx, miner, client, fcid1, true, 0) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) waitDealSealed(t, ctx, miner, client, deal1, true) - deal2 := startDeal(t, ctx, miner, client, fcid2, true) + deal2 := startDeal(t, ctx, miner, client, fcid2, true, 0) time.Sleep(time.Second) waitDealSealed(t, ctx, miner, client, deal2, false) @@ -278,7 +280,7 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration <-done } -func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool) *cid.Cid { +func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid { maddr, err := miner.ActorAddress(ctx) if err != nil { t.Fatal(err) @@ -296,6 +298,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client Wallet: addr, Miner: maddr, EpochPrice: types.NewInt(1000000), + DealStartEpoch: startEpoch, MinBlocksDuration: uint64(build.MinDealDuration), FastRetrieval: fastRet, }) diff --git a/api/test/mining.go b/api/test/mining.go index 11953b95d..8f3689333 100644 --- a/api/test/mining.go +++ b/api/test/mining.go @@ -186,7 +186,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo } }() - deal := startDeal(t, ctx, provider, client, fcid, false) + deal := startDeal(t, ctx, provider, client, fcid, false, 0) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) diff --git a/cmd/lotus-gateway/endtoend_test.go b/cmd/lotus-gateway/endtoend_test.go index f0b950f5e..4d5e88c82 100644 --- a/cmd/lotus-gateway/endtoend_test.go +++ b/cmd/lotus-gateway/endtoend_test.go @@ -171,7 +171,11 @@ func TestDealFlow(t *testing.T) { nodes := startNodesWithFunds(ctx, t, blocktime, maxLookbackCap, maxStateWaitLookbackLimit) defer nodes.closer() - test.MakeDeal(t, ctx, 6, nodes.lite, nodes.miner, false, false) + // For these tests where the block time is artificially short, just use + // a deal start epoch that is guaranteed to be far enough in the future + // so that the deal starts sealing in time + dealStartEpoch := abi.ChainEpoch(2 << 12) + test.MakeDeal(t, ctx, 6, nodes.lite, nodes.miner, false, false, dealStartEpoch) } func TestCLIDealFlow(t *testing.T) { diff --git a/cmd/lotus-storage-miner/allinfo_test.go b/cmd/lotus-storage-miner/allinfo_test.go index a458c024b..51aba14a9 100644 --- a/cmd/lotus-storage-miner/allinfo_test.go +++ b/cmd/lotus-storage-miner/allinfo_test.go @@ -70,7 +70,7 @@ func TestMinerAllInfo(t *testing.T) { return n, sn } - test.TestDealFlow(t, bp, time.Second, false, false) + test.TestDealFlow(t, bp, time.Second, false, false, 0) t.Run("post-info-all", run) } diff --git a/node/node_test.go b/node/node_test.go index b8009aa78..0baa047da 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -38,17 +38,24 @@ func TestAPIDealFlow(t *testing.T) { logging.SetLogLevel("sub", "ERROR") logging.SetLogLevel("storageminer", "ERROR") + blockTime := 10 * time.Millisecond + + // For these tests where the block time is artificially short, just use + // a deal start epoch that is guaranteed to be far enough in the future + // so that the deal starts sealing in time + dealStartEpoch := abi.ChainEpoch(2 << 12) + t.Run("TestDealFlow", func(t *testing.T) { - test.TestDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond, false, false) + test.TestDealFlow(t, builder.MockSbBuilder, blockTime, false, false, dealStartEpoch) }) t.Run("WithExportedCAR", func(t *testing.T) { - test.TestDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond, true, false) + test.TestDealFlow(t, builder.MockSbBuilder, blockTime, true, false, dealStartEpoch) }) t.Run("TestDoubleDealFlow", func(t *testing.T) { - test.TestDoubleDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond) + test.TestDoubleDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch) }) t.Run("TestFastRetrievalDealFlow", func(t *testing.T) { - test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond) + test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch) }) } @@ -71,15 +78,15 @@ func TestAPIDealFlowReal(t *testing.T) { }) t.Run("basic", func(t *testing.T) { - test.TestDealFlow(t, builder.Builder, time.Second, false, false) + test.TestDealFlow(t, builder.Builder, time.Second, false, false, 0) }) t.Run("fast-retrieval", func(t *testing.T) { - test.TestDealFlow(t, builder.Builder, time.Second, false, true) + test.TestDealFlow(t, builder.Builder, time.Second, false, true, 0) }) t.Run("retrieval-second", func(t *testing.T) { - test.TestSenondDealRetrieval(t, builder.Builder, time.Second) + test.TestSecondDealRetrieval(t, builder.Builder, time.Second) }) } From 44d929c084e19a78c5d04060864cdf1a16137f28 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 1 Dec 2020 15:31:05 +0100 Subject: [PATCH 4/5] fix: increase proposed deal start epoch for CLI test --- cli/test/client.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cli/test/client.go b/cli/test/client.go index 3a2827a65..e56814cff 100644 --- a/cli/test/client.go +++ b/cli/test/client.go @@ -43,13 +43,14 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode) require.Regexp(t, regexp.MustCompile("Ask:"), out) // Create a deal (non-interactive) - // client deal 1000000attofil + // client deal --start-epoch= 1000000attofil res, _, err := test.CreateClientFile(ctx, clientNode, 1) require.NoError(t, err) + startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12) dataCid := res.Root price := "1000000attofil" duration := fmt.Sprintf("%d", build.MinDealDuration) - out = clientCLI.RunCmd("client", "deal", dataCid.String(), minerAddr.String(), price, duration) + out = clientCLI.RunCmd("client", "deal", startEpoch, dataCid.String(), minerAddr.String(), price, duration) fmt.Println("client deal", out) // Create a deal (interactive) From b7436cd300bd7414ab673621073c119538dd2ecb Mon Sep 17 00:00:00 2001 From: dirkmc Date: Tue, 1 Dec 2020 16:48:48 +0100 Subject: [PATCH 5/5] Update markets/storageadapter/ondealsectorcommitted.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Ɓukasz Magiera --- markets/storageadapter/ondealsectorcommitted.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 4980c233c..bfa084638 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -71,7 +71,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // If the deal hasn't been activated by the proposed start epoch, the // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + err = xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) return false, err }