diff --git a/api/test/ccupgrade.go b/api/test/ccupgrade.go index 4f6b39701..eedcec6ca 100644 --- a/api/test/ccupgrade.go +++ b/api/test/ccupgrade.go @@ -89,7 +89,7 @@ func testCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, upgradeH t.Fatal(err) } - MakeDeal(t, ctx, 6, client, miner, false, false) + MakeDeal(t, ctx, 6, client, miner, false, false, 0) // Validate upgrade diff --git a/api/test/deals.go b/api/test/deals.go index d2cabadc6..1189f070e 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/filecoin-project/go-state-types/abi" + "github.com/stretchr/testify/require" "github.com/ipfs/go-cid" @@ -31,7 +33,7 @@ import ( ipld "github.com/ipfs/go-ipld-format" ) -func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool) { +func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool, startEpoch abi.ChainEpoch) { ctx := context.Background() n, sn := b(t, OneFull, OneMiner) @@ -60,14 +62,14 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport } }() - MakeDeal(t, ctx, 6, client, miner, carExport, fastRet) + MakeDeal(t, ctx, 6, client, miner, carExport, fastRet, startEpoch) atomic.AddInt64(&mine, -1) fmt.Println("shutting down mining") <-done } -func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { +func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { ctx := context.Background() n, sn := b(t, OneFull, OneMiner) @@ -97,15 +99,15 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { } }() - MakeDeal(t, ctx, 6, client, miner, false, false) - MakeDeal(t, ctx, 7, client, miner, false, false) + MakeDeal(t, ctx, 6, client, miner, false, false, startEpoch) + MakeDeal(t, ctx, 7, client, miner, false, false, startEpoch) atomic.AddInt64(&mine, -1) fmt.Println("shutting down mining") <-done } -func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool) { +func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) { res, data, err := CreateClientFile(ctx, client, rseed) if err != nil { t.Fatal(err) @@ -114,7 +116,7 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, fcid := res.Root fmt.Println("FILE CID: ", fcid) - deal := startDeal(t, ctx, miner, client, fcid, fastRet) + deal := startDeal(t, ctx, miner, client, fcid, fastRet, startEpoch) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) @@ -149,7 +151,7 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api return res, data, nil } -func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { +func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { ctx := context.Background() n, sn := b(t, OneFull, OneMiner) @@ -189,7 +191,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati fmt.Println("FILE CID: ", fcid) - deal := startDeal(t, ctx, miner, client, fcid, true) + deal := startDeal(t, ctx, miner, client, fcid, true, startEpoch) waitDealPublished(t, ctx, miner, deal) fmt.Println("deal published, retrieving") @@ -203,7 +205,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati <-done } -func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) { +func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) { ctx := context.Background() n, sn := b(t, OneFull, OneMiner) @@ -252,13 +254,13 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration t.Fatal(err) } - deal1 := startDeal(t, ctx, miner, client, fcid1, true) + deal1 := startDeal(t, ctx, miner, client, fcid1, true, 0) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) waitDealSealed(t, ctx, miner, client, deal1, true) - deal2 := startDeal(t, ctx, miner, client, fcid2, true) + deal2 := startDeal(t, ctx, miner, client, fcid2, true, 0) time.Sleep(time.Second) waitDealSealed(t, ctx, miner, client, deal2, false) @@ -278,7 +280,7 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration <-done } -func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool) *cid.Cid { +func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid { maddr, err := miner.ActorAddress(ctx) if err != nil { t.Fatal(err) @@ -296,6 +298,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client Wallet: addr, Miner: maddr, EpochPrice: types.NewInt(1000000), + DealStartEpoch: startEpoch, MinBlocksDuration: uint64(build.MinDealDuration), FastRetrieval: fastRet, }) diff --git a/api/test/mining.go b/api/test/mining.go index 11953b95d..8f3689333 100644 --- a/api/test/mining.go +++ b/api/test/mining.go @@ -186,7 +186,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo } }() - deal := startDeal(t, ctx, provider, client, fcid, false) + deal := startDeal(t, ctx, provider, client, fcid, false, 0) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index fb0b91378..1e29e72d8 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -300,7 +300,11 @@ func GetStorageDeal(ctx context.Context, sm *StateManager, dealID abi.DealID, ts if err != nil { return nil, err } else if !found { - return nil, xerrors.Errorf("deal %d not found", dealID) + return nil, xerrors.Errorf( + "deal %d not found "+ + "- deal may not have completed sealing before deal proposal "+ + "start epoch, or deal may have been slashed", + dealID) } states, err := state.States() diff --git a/cli/test/client.go b/cli/test/client.go index b75e13025..4a49f732a 100644 --- a/cli/test/client.go +++ b/cli/test/client.go @@ -43,13 +43,14 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode) require.Regexp(t, regexp.MustCompile("Ask:"), out) // Create a deal (non-interactive) - // client deal 1000000attofil + // client deal --start-epoch= 1000000attofil res, _, err := test.CreateClientFile(ctx, clientNode, 1) require.NoError(t, err) + startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12) dataCid := res.Root price := "1000000attofil" duration := fmt.Sprintf("%d", build.MinDealDuration) - out = clientCLI.RunCmd("client", "deal", dataCid.String(), minerAddr.String(), price, duration) + out = clientCLI.RunCmd("client", "deal", startEpoch, dataCid.String(), minerAddr.String(), price, duration) fmt.Println("client deal", out) // Create a deal (interactive) diff --git a/cmd/lotus-gateway/endtoend_test.go b/cmd/lotus-gateway/endtoend_test.go index f0b950f5e..4d5e88c82 100644 --- a/cmd/lotus-gateway/endtoend_test.go +++ b/cmd/lotus-gateway/endtoend_test.go @@ -171,7 +171,11 @@ func TestDealFlow(t *testing.T) { nodes := startNodesWithFunds(ctx, t, blocktime, maxLookbackCap, maxStateWaitLookbackLimit) defer nodes.closer() - test.MakeDeal(t, ctx, 6, nodes.lite, nodes.miner, false, false) + // For these tests where the block time is artificially short, just use + // a deal start epoch that is guaranteed to be far enough in the future + // so that the deal starts sealing in time + dealStartEpoch := abi.ChainEpoch(2 << 12) + test.MakeDeal(t, ctx, 6, nodes.lite, nodes.miner, false, false, dealStartEpoch) } func TestCLIDealFlow(t *testing.T) { diff --git a/cmd/lotus-storage-miner/allinfo_test.go b/cmd/lotus-storage-miner/allinfo_test.go index a458c024b..51aba14a9 100644 --- a/cmd/lotus-storage-miner/allinfo_test.go +++ b/cmd/lotus-storage-miner/allinfo_test.go @@ -70,7 +70,7 @@ func TestMinerAllInfo(t *testing.T) { return n, sn } - test.TestDealFlow(t, bp, time.Second, false, false) + test.TestDealFlow(t, bp, time.Second, false, false, 0) t.Run("post-info-all", run) } diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 85cc8f97b..bfa084638 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -3,6 +3,7 @@ package storageadapter import ( "bytes" "context" + "sync" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" @@ -20,7 +21,15 @@ type sectorCommittedEventsAPI interface { Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error } -func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error { +func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error { + // Ensure callback is only called once + var once sync.Once + cb := func(sectorNumber abi.SectorNumber, isActive bool, err error) { + once.Do(func() { + callback(sectorNumber, isActive, err) + }) + } + // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) @@ -47,6 +56,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return matched, nil } + // The deal must be accepted by the deal proposal start epoch, so timeout + // if the chain reaches that epoch + timeoutEpoch := proposal.StartEpoch + 1 + // Check if the message params included the deal ID we're looking for. called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) { defer func() { @@ -55,9 +68,11 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev } }() - // Check if waiting for pre-commit timed out + // If the deal hasn't been activated by the proposed start epoch, the + // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - return false, xerrors.Errorf("timed out waiting for deal %d pre-commit", dealID) + err = xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + return false, err } // Extract the message parameters @@ -92,14 +107,22 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return nil } - if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil { + if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { return xerrors.Errorf("failed to set up called handler: %w", err) } return nil } -func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error { +func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error { + // Ensure callback is only called once + var once sync.Once + cb := func(err error) { + once.Do(func() { + callback(err) + }) + } + // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) @@ -134,6 +157,10 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return params.SectorNumber == sectorNumber, nil } + // The deal must be accepted by the deal proposal start epoch, so timeout + // if the chain reaches that epoch + timeoutEpoch := proposal.StartEpoch + 1 + called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) { defer func() { if err != nil { @@ -141,9 +168,11 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } }() - // Check if waiting for prove-commit timed out + // If the deal hasn't been activated by the proposed start epoch, the + // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - return false, xerrors.Errorf("timed out waiting for deal activation for deal %d", dealID) + err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + return false, err } // Get the deal info @@ -170,7 +199,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return nil } - if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil { + if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { return xerrors.Errorf("failed to set up called handler: %w", err) } @@ -185,6 +214,14 @@ func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts } // Sector with deal is already active - isActive := sd.State.SectorStartEpoch > 0 - return isActive, nil + if sd.State.SectorStartEpoch > 0 { + return true, nil + } + + // Sector was slashed + if sd.State.SlashEpoch > 0 { + return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) + } + + return false, nil } diff --git a/markets/storageadapter/ondealsectorcommitted_test.go b/markets/storageadapter/ondealsectorcommitted_test.go index 0ffc2ff3e..30fbfea76 100644 --- a/markets/storageadapter/ondealsectorcommitted_test.go +++ b/markets/storageadapter/ondealsectorcommitted_test.go @@ -8,6 +8,8 @@ import ( "math/rand" "testing" + "golang.org/x/xerrors" + blocks "github.com/ipfs/go-block-format" "github.com/filecoin-project/go-address" @@ -53,17 +55,19 @@ func TestOnDealSectorPreCommitted(t *testing.T) { LastUpdatedEpoch: 2, }, } - testCases := map[string]struct { + type testCase struct { searchMessageLookup *api.MsgLookup searchMessageErr error checkTsDeals map[abi.DealID]*api.MarketDeal matchStates []matchState + dealStartEpochTimeout bool expectedCBCallCount uint64 expectedCBSectorNumber abi.SectorNumber expectedCBIsActive bool expectedCBError error expectedError error - }{ + } + testCases := map[string]testCase{ "normal sequence": { checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, @@ -142,18 +146,16 @@ func TestOnDealSectorPreCommitted(t *testing.T) { expectedCBError: errors.New("handling applied event: something went wrong"), expectedError: errors.New("failed to set up called handler: something went wrong"), }, + "proposed deal epoch timeout": { + checkTsDeals: map[abi.DealID]*api.MarketDeal{ + startDealID: unfinishedDeal, + }, + dealStartEpochTimeout: true, + expectedCBCallCount: 1, + expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), + }, } - runTestCase := func(testCase string, data struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - checkTsDeals map[abi.DealID]*api.MarketDeal - matchStates []matchState - expectedCBCallCount uint64 - expectedCBSectorNumber abi.SectorNumber - expectedCBIsActive bool - expectedCBError error - expectedError error - }) { + runTestCase := func(testCase string, data testCase) { t.Run(testCase, func(t *testing.T) { // ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // defer cancel() @@ -182,9 +184,10 @@ func TestOnDealSectorPreCommitted(t *testing.T) { } } eventsAPI := &fakeEvents{ - Ctx: ctx, - CheckTs: checkTs, - MatchMessages: matchMessages, + Ctx: ctx, + CheckTs: checkTs, + MatchMessages: matchMessages, + DealStartEpochTimeout: data.dealStartEpochTimeout, } cbCallCount := uint64(0) var cbSectorNumber abi.SectorNumber @@ -245,15 +248,17 @@ func TestOnDealSectorCommitted(t *testing.T) { LastUpdatedEpoch: 2, }, } - testCases := map[string]struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - checkTsDeals map[abi.DealID]*api.MarketDeal - matchStates []matchState - expectedCBCallCount uint64 - expectedCBError error - expectedError error - }{ + type testCase struct { + searchMessageLookup *api.MsgLookup + searchMessageErr error + checkTsDeals map[abi.DealID]*api.MarketDeal + matchStates []matchState + dealStartEpochTimeout bool + expectedCBCallCount uint64 + expectedCBError error + expectedError error + } + testCases := map[string]testCase{ "normal sequence": { checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, @@ -323,16 +328,16 @@ func TestOnDealSectorCommitted(t *testing.T) { expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"), expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), }, + "proposed deal epoch timeout": { + checkTsDeals: map[abi.DealID]*api.MarketDeal{ + startDealID: unfinishedDeal, + }, + dealStartEpochTimeout: true, + expectedCBCallCount: 1, + expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), + }, } - runTestCase := func(testCase string, data struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - checkTsDeals map[abi.DealID]*api.MarketDeal - matchStates []matchState - expectedCBCallCount uint64 - expectedCBError error - expectedError error - }) { + runTestCase := func(testCase string, data testCase) { t.Run(testCase, func(t *testing.T) { // ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // defer cancel() @@ -361,9 +366,10 @@ func TestOnDealSectorCommitted(t *testing.T) { } } eventsAPI := &fakeEvents{ - Ctx: ctx, - CheckTs: checkTs, - MatchMessages: matchMessages, + Ctx: ctx, + CheckTs: checkTs, + MatchMessages: matchMessages, + DealStartEpochTimeout: data.dealStartEpochTimeout, } cbCallCount := uint64(0) var cbError error @@ -403,12 +409,18 @@ type matchMessage struct { doesRevert bool } type fakeEvents struct { - Ctx context.Context - CheckTs *types.TipSet - MatchMessages []matchMessage + Ctx context.Context + CheckTs *types.TipSet + MatchMessages []matchMessage + DealStartEpochTimeout bool } func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error { + if fe.DealStartEpochTimeout { + msgHnd(nil, nil, nil, 100) // nolint:errcheck + return nil + } + _, more, err := check(fe.CheckTs) if err != nil { return err diff --git a/node/node_test.go b/node/node_test.go index b8009aa78..0baa047da 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -38,17 +38,24 @@ func TestAPIDealFlow(t *testing.T) { logging.SetLogLevel("sub", "ERROR") logging.SetLogLevel("storageminer", "ERROR") + blockTime := 10 * time.Millisecond + + // For these tests where the block time is artificially short, just use + // a deal start epoch that is guaranteed to be far enough in the future + // so that the deal starts sealing in time + dealStartEpoch := abi.ChainEpoch(2 << 12) + t.Run("TestDealFlow", func(t *testing.T) { - test.TestDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond, false, false) + test.TestDealFlow(t, builder.MockSbBuilder, blockTime, false, false, dealStartEpoch) }) t.Run("WithExportedCAR", func(t *testing.T) { - test.TestDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond, true, false) + test.TestDealFlow(t, builder.MockSbBuilder, blockTime, true, false, dealStartEpoch) }) t.Run("TestDoubleDealFlow", func(t *testing.T) { - test.TestDoubleDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond) + test.TestDoubleDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch) }) t.Run("TestFastRetrievalDealFlow", func(t *testing.T) { - test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond) + test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch) }) } @@ -71,15 +78,15 @@ func TestAPIDealFlowReal(t *testing.T) { }) t.Run("basic", func(t *testing.T) { - test.TestDealFlow(t, builder.Builder, time.Second, false, false) + test.TestDealFlow(t, builder.Builder, time.Second, false, false, 0) }) t.Run("fast-retrieval", func(t *testing.T) { - test.TestDealFlow(t, builder.Builder, time.Second, false, true) + test.TestDealFlow(t, builder.Builder, time.Second, false, true, 0) }) t.Run("retrieval-second", func(t *testing.T) { - test.TestSenondDealRetrieval(t, builder.Builder, time.Second) + test.TestSecondDealRetrieval(t, builder.Builder, time.Second) }) }