Merge pull request #5061 from filecoin-project/fix/err-late-deals

Error out deals that are not activated by proposed deal start epoch
This commit is contained in:
Łukasz Magiera 2020-12-01 17:02:06 +01:00 committed by GitHub
commit 82b5cb89cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 145 additions and 77 deletions

View File

@ -89,7 +89,7 @@ func testCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, upgradeH
t.Fatal(err)
}
MakeDeal(t, ctx, 6, client, miner, false, false)
MakeDeal(t, ctx, 6, client, miner, false, false, 0)
// Validate upgrade

View File

@ -12,6 +12,8 @@ import (
"testing"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/stretchr/testify/require"
"github.com/ipfs/go-cid"
@ -31,7 +33,7 @@ import (
ipld "github.com/ipfs/go-ipld-format"
)
func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool) {
func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
@ -60,14 +62,14 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport
}
}()
MakeDeal(t, ctx, 6, client, miner, carExport, fastRet)
MakeDeal(t, ctx, 6, client, miner, carExport, fastRet, startEpoch)
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}
func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
@ -97,15 +99,15 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
}
}()
MakeDeal(t, ctx, 6, client, miner, false, false)
MakeDeal(t, ctx, 7, client, miner, false, false)
MakeDeal(t, ctx, 6, client, miner, false, false, startEpoch)
MakeDeal(t, ctx, 7, client, miner, false, false, startEpoch)
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}
func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool) {
func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
res, data, err := CreateClientFile(ctx, client, rseed)
if err != nil {
t.Fatal(err)
@ -114,7 +116,7 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode,
fcid := res.Root
fmt.Println("FILE CID: ", fcid)
deal := startDeal(t, ctx, miner, client, fcid, fastRet)
deal := startDeal(t, ctx, miner, client, fcid, fastRet, startEpoch)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
@ -149,7 +151,7 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api
return res, data, nil
}
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
@ -189,7 +191,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati
fmt.Println("FILE CID: ", fcid)
deal := startDeal(t, ctx, miner, client, fcid, true)
deal := startDeal(t, ctx, miner, client, fcid, true, startEpoch)
waitDealPublished(t, ctx, miner, deal)
fmt.Println("deal published, retrieving")
@ -203,7 +205,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati
<-done
}
func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) {
func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) {
ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
@ -252,13 +254,13 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
t.Fatal(err)
}
deal1 := startDeal(t, ctx, miner, client, fcid1, true)
deal1 := startDeal(t, ctx, miner, client, fcid1, true, 0)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal1, true)
deal2 := startDeal(t, ctx, miner, client, fcid2, true)
deal2 := startDeal(t, ctx, miner, client, fcid2, true, 0)
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal2, false)
@ -278,7 +280,7 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
<-done
}
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool) *cid.Cid {
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid {
maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
@ -296,6 +298,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
Wallet: addr,
Miner: maddr,
EpochPrice: types.NewInt(1000000),
DealStartEpoch: startEpoch,
MinBlocksDuration: uint64(build.MinDealDuration),
FastRetrieval: fastRet,
})

View File

@ -186,7 +186,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
}
}()
deal := startDeal(t, ctx, provider, client, fcid, false)
deal := startDeal(t, ctx, provider, client, fcid, false, 0)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)

View File

@ -300,7 +300,11 @@ func GetStorageDeal(ctx context.Context, sm *StateManager, dealID abi.DealID, ts
if err != nil {
return nil, err
} else if !found {
return nil, xerrors.Errorf("deal %d not found", dealID)
return nil, xerrors.Errorf(
"deal %d not found "+
"- deal may not have completed sealing before deal proposal "+
"start epoch, or deal may have been slashed",
dealID)
}
states, err := state.States()

View File

@ -43,13 +43,14 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
require.Regexp(t, regexp.MustCompile("Ask:"), out)
// Create a deal (non-interactive)
// client deal <cid> <miner addr> 1000000attofil <duration>
// client deal --start-epoch=<start epoch> <cid> <miner addr> 1000000attofil <duration>
res, _, err := test.CreateClientFile(ctx, clientNode, 1)
require.NoError(t, err)
startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12)
dataCid := res.Root
price := "1000000attofil"
duration := fmt.Sprintf("%d", build.MinDealDuration)
out = clientCLI.RunCmd("client", "deal", dataCid.String(), minerAddr.String(), price, duration)
out = clientCLI.RunCmd("client", "deal", startEpoch, dataCid.String(), minerAddr.String(), price, duration)
fmt.Println("client deal", out)
// Create a deal (interactive)

View File

@ -171,7 +171,11 @@ func TestDealFlow(t *testing.T) {
nodes := startNodesWithFunds(ctx, t, blocktime, maxLookbackCap, maxStateWaitLookbackLimit)
defer nodes.closer()
test.MakeDeal(t, ctx, 6, nodes.lite, nodes.miner, false, false)
// For these tests where the block time is artificially short, just use
// a deal start epoch that is guaranteed to be far enough in the future
// so that the deal starts sealing in time
dealStartEpoch := abi.ChainEpoch(2 << 12)
test.MakeDeal(t, ctx, 6, nodes.lite, nodes.miner, false, false, dealStartEpoch)
}
func TestCLIDealFlow(t *testing.T) {

View File

@ -70,7 +70,7 @@ func TestMinerAllInfo(t *testing.T) {
return n, sn
}
test.TestDealFlow(t, bp, time.Second, false, false)
test.TestDealFlow(t, bp, time.Second, false, false, 0)
t.Run("post-info-all", run)
}

View File

@ -3,6 +3,7 @@ package storageadapter
import (
"bytes"
"context"
"sync"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
@ -20,7 +21,15 @@ type sectorCommittedEventsAPI interface {
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
}
func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error {
// Ensure callback is only called once
var once sync.Once
cb := func(sectorNumber abi.SectorNumber, isActive bool, err error) {
once.Do(func() {
callback(sectorNumber, isActive, err)
})
}
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
@ -47,6 +56,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return matched, nil
}
// The deal must be accepted by the deal proposal start epoch, so timeout
// if the chain reaches that epoch
timeoutEpoch := proposal.StartEpoch + 1
// Check if the message params included the deal ID we're looking for.
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
defer func() {
@ -55,9 +68,11 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
}
}()
// Check if waiting for pre-commit timed out
// If the deal hasn't been activated by the proposed start epoch, the
// deal will timeout (when msg == nil it means the timeout epoch was reached)
if msg == nil {
return false, xerrors.Errorf("timed out waiting for deal %d pre-commit", dealID)
err = xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
return false, err
}
// Extract the message parameters
@ -92,14 +107,22 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return nil
}
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil {
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}
return nil
}
func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error {
// Ensure callback is only called once
var once sync.Once
cb := func(err error) {
once.Do(func() {
callback(err)
})
}
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
@ -134,6 +157,10 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return params.SectorNumber == sectorNumber, nil
}
// The deal must be accepted by the deal proposal start epoch, so timeout
// if the chain reaches that epoch
timeoutEpoch := proposal.StartEpoch + 1
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
defer func() {
if err != nil {
@ -141,9 +168,11 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
}
}()
// Check if waiting for prove-commit timed out
// If the deal hasn't been activated by the proposed start epoch, the
// deal will timeout (when msg == nil it means the timeout epoch was reached)
if msg == nil {
return false, xerrors.Errorf("timed out waiting for deal activation for deal %d", dealID)
err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
return false, err
}
// Get the deal info
@ -170,7 +199,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return nil
}
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), events.NoTimeout, matchEvent); err != nil {
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}
@ -185,6 +214,14 @@ func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts
}
// Sector with deal is already active
isActive := sd.State.SectorStartEpoch > 0
return isActive, nil
if sd.State.SectorStartEpoch > 0 {
return true, nil
}
// Sector was slashed
if sd.State.SlashEpoch > 0 {
return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
}
return false, nil
}

View File

@ -8,6 +8,8 @@ import (
"math/rand"
"testing"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
"github.com/filecoin-project/go-address"
@ -53,17 +55,19 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
LastUpdatedEpoch: 2,
},
}
testCases := map[string]struct {
type testCase struct {
searchMessageLookup *api.MsgLookup
searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState
dealStartEpochTimeout bool
expectedCBCallCount uint64
expectedCBSectorNumber abi.SectorNumber
expectedCBIsActive bool
expectedCBError error
expectedError error
}{
}
testCases := map[string]testCase{
"normal sequence": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
@ -142,18 +146,16 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
expectedCBError: errors.New("handling applied event: something went wrong"),
expectedError: errors.New("failed to set up called handler: something went wrong"),
},
"proposed deal epoch timeout": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
dealStartEpochTimeout: true,
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID),
},
}
runTestCase := func(testCase string, data struct {
searchMessageLookup *api.MsgLookup
searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState
expectedCBCallCount uint64
expectedCBSectorNumber abi.SectorNumber
expectedCBIsActive bool
expectedCBError error
expectedError error
}) {
runTestCase := func(testCase string, data testCase) {
t.Run(testCase, func(t *testing.T) {
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// defer cancel()
@ -185,6 +187,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
Ctx: ctx,
CheckTs: checkTs,
MatchMessages: matchMessages,
DealStartEpochTimeout: data.dealStartEpochTimeout,
}
cbCallCount := uint64(0)
var cbSectorNumber abi.SectorNumber
@ -245,15 +248,17 @@ func TestOnDealSectorCommitted(t *testing.T) {
LastUpdatedEpoch: 2,
},
}
testCases := map[string]struct {
type testCase struct {
searchMessageLookup *api.MsgLookup
searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState
dealStartEpochTimeout bool
expectedCBCallCount uint64
expectedCBError error
expectedError error
}{
}
testCases := map[string]testCase{
"normal sequence": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
@ -323,16 +328,16 @@ func TestOnDealSectorCommitted(t *testing.T) {
expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"),
expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"),
},
"proposed deal epoch timeout": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
dealStartEpochTimeout: true,
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID),
},
}
runTestCase := func(testCase string, data struct {
searchMessageLookup *api.MsgLookup
searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState
expectedCBCallCount uint64
expectedCBError error
expectedError error
}) {
runTestCase := func(testCase string, data testCase) {
t.Run(testCase, func(t *testing.T) {
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// defer cancel()
@ -364,6 +369,7 @@ func TestOnDealSectorCommitted(t *testing.T) {
Ctx: ctx,
CheckTs: checkTs,
MatchMessages: matchMessages,
DealStartEpochTimeout: data.dealStartEpochTimeout,
}
cbCallCount := uint64(0)
var cbError error
@ -406,9 +412,15 @@ type fakeEvents struct {
Ctx context.Context
CheckTs *types.TipSet
MatchMessages []matchMessage
DealStartEpochTimeout bool
}
func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error {
if fe.DealStartEpochTimeout {
msgHnd(nil, nil, nil, 100) // nolint:errcheck
return nil
}
_, more, err := check(fe.CheckTs)
if err != nil {
return err

View File

@ -38,17 +38,24 @@ func TestAPIDealFlow(t *testing.T) {
logging.SetLogLevel("sub", "ERROR")
logging.SetLogLevel("storageminer", "ERROR")
blockTime := 10 * time.Millisecond
// For these tests where the block time is artificially short, just use
// a deal start epoch that is guaranteed to be far enough in the future
// so that the deal starts sealing in time
dealStartEpoch := abi.ChainEpoch(2 << 12)
t.Run("TestDealFlow", func(t *testing.T) {
test.TestDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond, false, false)
test.TestDealFlow(t, builder.MockSbBuilder, blockTime, false, false, dealStartEpoch)
})
t.Run("WithExportedCAR", func(t *testing.T) {
test.TestDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond, true, false)
test.TestDealFlow(t, builder.MockSbBuilder, blockTime, true, false, dealStartEpoch)
})
t.Run("TestDoubleDealFlow", func(t *testing.T) {
test.TestDoubleDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond)
test.TestDoubleDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
})
t.Run("TestFastRetrievalDealFlow", func(t *testing.T) {
test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, 10*time.Millisecond)
test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
})
}
@ -71,15 +78,15 @@ func TestAPIDealFlowReal(t *testing.T) {
})
t.Run("basic", func(t *testing.T) {
test.TestDealFlow(t, builder.Builder, time.Second, false, false)
test.TestDealFlow(t, builder.Builder, time.Second, false, false, 0)
})
t.Run("fast-retrieval", func(t *testing.T) {
test.TestDealFlow(t, builder.Builder, time.Second, false, true)
test.TestDealFlow(t, builder.Builder, time.Second, false, true, 0)
})
t.Run("retrieval-second", func(t *testing.T) {
test.TestSenondDealRetrieval(t, builder.Builder, time.Second)
test.TestSecondDealRetrieval(t, builder.Builder, time.Second)
})
}