feat: error out deals that are not activated by proposed deal start epoch

This commit is contained in:
Dirk McCormick 2020-11-30 15:16:13 +01:00
parent 660ac26dd9
commit e02a366419
3 changed files with 166 additions and 49 deletions

View File

@ -300,7 +300,11 @@ func GetStorageDeal(ctx context.Context, sm *StateManager, dealID abi.DealID, ts
if err != nil { if err != nil {
return nil, err return nil, err
} else if !found { } else if !found {
return nil, xerrors.Errorf("deal %d not found", dealID) return nil, xerrors.Errorf(
"deal %d not found "+
"- deal may not have completed sealing before deal proposal "+
"start epoch, or deal may have been slashed",
dealID)
} }
states, err := state.States() states, err := state.States()

View File

@ -3,10 +3,12 @@ package storageadapter
import ( import (
"bytes" "bytes"
"context" "context"
"sync"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -18,12 +20,22 @@ import (
type sectorCommittedEventsAPI interface { type sectorCommittedEventsAPI interface {
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error
} }
func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error { func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error {
// Ensure callback is only called once
var once sync.Once
cb := func(sectorNumber abi.SectorNumber, isActive bool, err error) {
once.Do(func() {
callback(sectorNumber, isActive, err)
})
}
// First check if the deal is already active, and if so, bail out // First check if the deal is already active, and if so, bail out
var proposedDealStartEpoch abi.ChainEpoch
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil { if err != nil {
// Note: the error returned from here will end up being returned // Note: the error returned from here will end up being returned
// from OnDealSectorPreCommitted so no need to call the callback // from OnDealSectorPreCommitted so no need to call the callback
@ -37,6 +49,10 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return true, false, nil return true, false, nil
} }
// Save the proposed deal start epoch so we can timeout if the deal
// hasn't been activated by that epoch
proposedDealStartEpoch = deal.Proposal.StartEpoch
// Not yet active, start matching against incoming messages // Not yet active, start matching against incoming messages
return false, true, nil return false, true, nil
} }
@ -96,13 +112,28 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return xerrors.Errorf("failed to set up called handler: %w", err) return xerrors.Errorf("failed to set up called handler: %w", err)
} }
// If the deal hasn't been activated by the proposed start epoch, timeout
// the deal
timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) {
cb(0, false, err)
})
return nil return nil
} }
func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error { func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error {
// Ensure callback is only called once
var once sync.Once
cb := func(err error) {
once.Do(func() {
callback(err)
})
}
// First check if the deal is already active, and if so, bail out // First check if the deal is already active, and if so, bail out
var proposedDealStartEpoch abi.ChainEpoch
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) deal, isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil { if err != nil {
// Note: the error returned from here will end up being returned // Note: the error returned from here will end up being returned
// from OnDealSectorCommitted so no need to call the callback // from OnDealSectorCommitted so no need to call the callback
@ -116,6 +147,10 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return true, false, nil return true, false, nil
} }
// Save the proposed deal start epoch so we can timeout if the deal
// hasn't been activated by that epoch
proposedDealStartEpoch = deal.Proposal.StartEpoch
// Not yet active, start matching against incoming messages // Not yet active, start matching against incoming messages
return false, true, nil return false, true, nil
} }
@ -174,17 +209,57 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return xerrors.Errorf("failed to set up called handler: %w", err) return xerrors.Errorf("failed to set up called handler: %w", err)
} }
// If the deal hasn't been activated by the proposed start epoch, timeout
// the deal
timeoutOnProposedStartEpoch(dealID, proposedDealStartEpoch, eventsApi, func(err error) {
cb(err)
})
return nil return nil
} }
func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) { func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (*api.MarketDeal, bool, error) {
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil { if err != nil {
// TODO: This may be fine for some errors // TODO: This may be fine for some errors
return false, xerrors.Errorf("failed to look up deal on chain: %w", err) return nil, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
} }
// Sector with deal is already active // Sector with deal is already active
isActive := sd.State.SectorStartEpoch > 0 if sd.State.SectorStartEpoch > 0 {
return isActive, nil return nil, true, nil
}
// Sector was slashed
if sd.State.SlashEpoch > 0 {
return nil, false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
}
return sd, false, nil
}
// Once the chain reaches the proposed deal start epoch, callback with an error.
// Note that the functions that call timeoutOnProposedStartEpoch will ignore
// the callback if it's already been called (ie if a pre-commit or commit
// message lands on chain before the proposed deal start epoch).
func timeoutOnProposedStartEpoch(dealID abi.DealID, proposedDealStartEpoch abi.ChainEpoch, api sectorCommittedEventsAPI, cb func(err error)) {
// Called when the chain height reaches deal start epoch + confidence
heightAt := func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
cb(xerrors.Errorf("deal %d was not activated by deal start epoch %d", dealID, proposedDealStartEpoch))
return nil
}
// If the chain reorgs after reaching the deal start epoch, it's very
// unlikely to reorg in such a way that the deal changes from
// "not activated" to "activated before deal start epoch", so just log a
// warning.
revert := func(ctx context.Context, ts *types.TipSet) error {
log.Warnf("deal %d had reached start epoch %d but the chain reorged", dealID, proposedDealStartEpoch)
return nil
}
err := api.ChainAt(heightAt, revert, int(build.MessageConfidence+1), proposedDealStartEpoch+1)
if err != nil {
cb(xerrors.Errorf("error waiting for deal %d to become activated: %w", dealID, err))
}
} }

View File

@ -8,6 +8,8 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -53,17 +55,20 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
LastUpdatedEpoch: 2, LastUpdatedEpoch: 2,
}, },
} }
testCases := map[string]struct { type testCase struct {
searchMessageLookup *api.MsgLookup searchMessageLookup *api.MsgLookup
searchMessageErr error searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState matchStates []matchState
chainAtErr error
dealStartEpochTimeout bool
expectedCBCallCount uint64 expectedCBCallCount uint64
expectedCBSectorNumber abi.SectorNumber expectedCBSectorNumber abi.SectorNumber
expectedCBIsActive bool expectedCBIsActive bool
expectedCBError error expectedCBError error
expectedError error expectedError error
}{ }
testCases := map[string]testCase{
"normal sequence": { "normal sequence": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{ checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal, startDealID: unfinishedDeal,
@ -142,18 +147,24 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
expectedCBError: errors.New("handling applied event: something went wrong"), expectedCBError: errors.New("handling applied event: something went wrong"),
expectedError: errors.New("failed to set up called handler: something went wrong"), expectedError: errors.New("failed to set up called handler: something went wrong"),
}, },
"chainAt error": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
chainAtErr: errors.New("chain at err"),
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID),
},
"proposed deal epoch timeout": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
dealStartEpochTimeout: true,
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID),
},
} }
runTestCase := func(testCase string, data struct { runTestCase := func(testCase string, data testCase) {
searchMessageLookup *api.MsgLookup
searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState
expectedCBCallCount uint64
expectedCBSectorNumber abi.SectorNumber
expectedCBIsActive bool
expectedCBError error
expectedError error
}) {
t.Run(testCase, func(t *testing.T) { t.Run(testCase, func(t *testing.T) {
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// defer cancel() // defer cancel()
@ -182,9 +193,11 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
} }
} }
eventsAPI := &fakeEvents{ eventsAPI := &fakeEvents{
Ctx: ctx, Ctx: ctx,
CheckTs: checkTs, CheckTs: checkTs,
MatchMessages: matchMessages, MatchMessages: matchMessages,
ChainAtErr: data.chainAtErr,
DealStartEpochTimeout: data.dealStartEpochTimeout,
} }
cbCallCount := uint64(0) cbCallCount := uint64(0)
var cbSectorNumber abi.SectorNumber var cbSectorNumber abi.SectorNumber
@ -245,15 +258,18 @@ func TestOnDealSectorCommitted(t *testing.T) {
LastUpdatedEpoch: 2, LastUpdatedEpoch: 2,
}, },
} }
testCases := map[string]struct { type testCase struct {
searchMessageLookup *api.MsgLookup searchMessageLookup *api.MsgLookup
searchMessageErr error searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState matchStates []matchState
expectedCBCallCount uint64 chainAtErr error
expectedCBError error dealStartEpochTimeout bool
expectedError error expectedCBCallCount uint64
}{ expectedCBError error
expectedError error
}
testCases := map[string]testCase{
"normal sequence": { "normal sequence": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{ checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal, startDealID: unfinishedDeal,
@ -323,16 +339,24 @@ func TestOnDealSectorCommitted(t *testing.T) {
expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"), expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"),
expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"),
}, },
"chainAt error": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
chainAtErr: errors.New("chain at err"),
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("error waiting for deal %d to become activated: chain at err", startDealID),
},
"proposed deal epoch timeout": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
dealStartEpochTimeout: true,
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("deal %d was not activated by deal start epoch 0", startDealID),
},
} }
runTestCase := func(testCase string, data struct { runTestCase := func(testCase string, data testCase) {
searchMessageLookup *api.MsgLookup
searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal
matchStates []matchState
expectedCBCallCount uint64
expectedCBError error
expectedError error
}) {
t.Run(testCase, func(t *testing.T) { t.Run(testCase, func(t *testing.T) {
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// defer cancel() // defer cancel()
@ -361,9 +385,11 @@ func TestOnDealSectorCommitted(t *testing.T) {
} }
} }
eventsAPI := &fakeEvents{ eventsAPI := &fakeEvents{
Ctx: ctx, Ctx: ctx,
CheckTs: checkTs, CheckTs: checkTs,
MatchMessages: matchMessages, MatchMessages: matchMessages,
ChainAtErr: data.chainAtErr,
DealStartEpochTimeout: data.dealStartEpochTimeout,
} }
cbCallCount := uint64(0) cbCallCount := uint64(0)
var cbError error var cbError error
@ -403,9 +429,11 @@ type matchMessage struct {
doesRevert bool doesRevert bool
} }
type fakeEvents struct { type fakeEvents struct {
Ctx context.Context Ctx context.Context
CheckTs *types.TipSet CheckTs *types.TipSet
MatchMessages []matchMessage MatchMessages []matchMessage
ChainAtErr error
DealStartEpochTimeout bool
} }
func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error { func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error {
@ -440,6 +468,16 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r
return nil return nil
} }
func (fe *fakeEvents) ChainAt(hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error {
if fe.ChainAtErr != nil {
return fe.ChainAtErr
}
if fe.DealStartEpochTimeout {
_ = hnd(context.Background(), nil, abi.ChainEpoch(0))
}
return nil
}
func makeMessage(t *testing.T, to address.Address, method abi.MethodNum, params cbor.Marshaler) *types.Message { func makeMessage(t *testing.T, to address.Address, method abi.MethodNum, params cbor.Marshaler) *types.Message {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
err := params.MarshalCBOR(buf) err := params.MarshalCBOR(buf)