Merge pull request #5398 from filecoin-project/fix/market-precommit-diff
storageadapter: Look at precommits on-chain since deal publish msg
This commit is contained in:
commit
e77df95157
@ -12,22 +12,22 @@ var log = logging.Logger("markets")
|
|||||||
|
|
||||||
// StorageClientLogger logs events from the storage client
|
// StorageClientLogger logs events from the storage client
|
||||||
func StorageClientLogger(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
|
func StorageClientLogger(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
|
||||||
log.Infow("storage event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
|
log.Infow("storage client event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StorageProviderLogger logs events from the storage provider
|
// StorageProviderLogger logs events from the storage provider
|
||||||
func StorageProviderLogger(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
func StorageProviderLogger(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
||||||
log.Infow("storage event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
|
log.Infow("storage provider event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetrievalClientLogger logs events from the retrieval client
|
// RetrievalClientLogger logs events from the retrieval client
|
||||||
func RetrievalClientLogger(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
|
func RetrievalClientLogger(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
|
||||||
log.Infow("retrieval event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
|
log.Infow("retrieval client event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetrievalProviderLogger logs events from the retrieval provider
|
// RetrievalProviderLogger logs events from the retrieval provider
|
||||||
func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
|
func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
|
||||||
log.Infow("retrieval event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
|
log.Infow("retrieval provider event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DataTransferLogger logs events from the data transfer module
|
// DataTransferLogger logs events from the data transfer module
|
||||||
|
53
markets/storageadapter/api.go
Normal file
53
markets/storageadapter/api.go
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
package storageadapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors/adt"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api/apibstore"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type apiWrapper struct {
|
||||||
|
api interface {
|
||||||
|
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
|
||||||
|
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
|
||||||
|
ChainHasObj(context.Context, cid.Cid) (bool, error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ca *apiWrapper) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) {
|
||||||
|
store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(ca.api)))
|
||||||
|
|
||||||
|
preAct, err := ca.api.StateGetActor(ctx, actor, pre)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting pre actor: %w", err)
|
||||||
|
}
|
||||||
|
curAct, err := ca.api.StateGetActor(ctx, actor, cur)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting cur actor: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
preSt, err := miner.Load(store, preAct)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("loading miner actor: %w", err)
|
||||||
|
}
|
||||||
|
curSt, err := miner.Load(store, curAct)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("loading miner actor: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
diff, err := miner.DiffPreCommits(preSt, curSt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("diff precommits: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return diff, err
|
||||||
|
}
|
@ -34,9 +34,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type ClientNodeAdapter struct {
|
type ClientNodeAdapter struct {
|
||||||
full.StateAPI
|
*clientApi
|
||||||
full.ChainAPI
|
*apiWrapper
|
||||||
full.MpoolAPI
|
|
||||||
|
|
||||||
fundmgr *market.FundManager
|
fundmgr *market.FundManager
|
||||||
ev *events.Events
|
ev *events.Events
|
||||||
@ -46,14 +45,14 @@ type ClientNodeAdapter struct {
|
|||||||
type clientApi struct {
|
type clientApi struct {
|
||||||
full.ChainAPI
|
full.ChainAPI
|
||||||
full.StateAPI
|
full.StateAPI
|
||||||
|
full.MpoolAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
|
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
|
||||||
capi := &clientApi{chain, stateapi}
|
capi := &clientApi{chain, stateapi, mpool}
|
||||||
return &ClientNodeAdapter{
|
return &ClientNodeAdapter{
|
||||||
StateAPI: stateapi,
|
clientApi: capi,
|
||||||
ChainAPI: chain,
|
apiWrapper: &apiWrapper{api: capi},
|
||||||
MpoolAPI: mpool,
|
|
||||||
|
|
||||||
fundmgr: fundmgr,
|
fundmgr: fundmgr,
|
||||||
ev: events.NewEvents(context.TODO(), capi),
|
ev: events.NewEvents(context.TODO(), capi),
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-state-types/exitcode"
|
"github.com/filecoin-project/go-state-types/exitcode"
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -18,47 +19,49 @@ type getCurrentDealInfoAPI interface {
|
|||||||
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||||
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
|
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
|
||||||
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
|
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
|
||||||
|
|
||||||
|
diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed
|
// GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed
|
||||||
func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, error) {
|
func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, types.TipSetKey, error) {
|
||||||
marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key())
|
marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key())
|
||||||
if dealErr == nil {
|
if dealErr == nil {
|
||||||
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
|
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return dealID, nil, err
|
return dealID, nil, types.EmptyTSK, err
|
||||||
}
|
}
|
||||||
if equal {
|
if equal {
|
||||||
return dealID, marketDeal, nil
|
return dealID, marketDeal, types.EmptyTSK, nil
|
||||||
}
|
}
|
||||||
dealErr = xerrors.Errorf("Deal proposals did not match")
|
dealErr = xerrors.Errorf("Deal proposals did not match")
|
||||||
}
|
}
|
||||||
if publishCid == nil {
|
if publishCid == nil {
|
||||||
return dealID, nil, dealErr
|
return dealID, nil, types.EmptyTSK, dealErr
|
||||||
}
|
}
|
||||||
// attempt deal id correction
|
// attempt deal id correction
|
||||||
lookup, err := api.StateSearchMsg(ctx, *publishCid)
|
lookup, err := api.StateSearchMsg(ctx, *publishCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return dealID, nil, err
|
return dealID, nil, types.EmptyTSK, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if lookup.Receipt.ExitCode != exitcode.Ok {
|
if lookup.Receipt.ExitCode != exitcode.Ok {
|
||||||
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode)
|
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
var retval market.PublishStorageDealsReturn
|
var retval market.PublishStorageDealsReturn
|
||||||
if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
|
if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
|
||||||
return dealID, nil, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
|
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(retval.IDs) != 1 {
|
if len(retval.IDs) != 1 {
|
||||||
// market currently only ever sends messages with 1 deal
|
// market currently only ever sends messages with 1 deal
|
||||||
return dealID, nil, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
|
return dealID, nil, types.EmptyTSK, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
|
||||||
}
|
}
|
||||||
|
|
||||||
if retval.IDs[0] == dealID {
|
if retval.IDs[0] == dealID {
|
||||||
// DealID did not change, so we are stuck with the original lookup error
|
// DealID did not change, so we are stuck with the original lookup error
|
||||||
return dealID, nil, dealErr
|
return dealID, nil, lookup.TipSet, dealErr
|
||||||
}
|
}
|
||||||
|
|
||||||
dealID = retval.IDs[0]
|
dealID = retval.IDs[0]
|
||||||
@ -67,13 +70,13 @@ func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDea
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
|
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return dealID, nil, err
|
return dealID, nil, types.EmptyTSK, err
|
||||||
}
|
}
|
||||||
if !equal {
|
if !equal {
|
||||||
return dealID, nil, xerrors.Errorf("Deal proposals did not match")
|
return dealID, nil, types.EmptyTSK, xerrors.Errorf("Deal proposals did not match")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dealID, marketDeal, err
|
return dealID, marketDeal, lookup.TipSet, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) {
|
func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) {
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-state-types/exitcode"
|
"github.com/filecoin-project/go-state-types/exitcode"
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
test "github.com/filecoin-project/lotus/chain/events/state/mock"
|
test "github.com/filecoin-project/lotus/chain/events/state/mock"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -209,7 +210,7 @@ func TestGetCurrentDealInfo(t *testing.T) {
|
|||||||
MarketDeals: marketDeals,
|
MarketDeals: marketDeals,
|
||||||
}
|
}
|
||||||
|
|
||||||
dealID, marketDeal, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid)
|
dealID, marketDeal, _, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid)
|
||||||
require.Equal(t, data.expectedDealID, dealID)
|
require.Equal(t, data.expectedDealID, dealID)
|
||||||
require.Equal(t, data.expectedMarketDeal, marketDeal)
|
require.Equal(t, data.expectedMarketDeal, marketDeal)
|
||||||
if data.expectedError == nil {
|
if data.expectedError == nil {
|
||||||
@ -236,6 +237,10 @@ type mockGetCurrentDealInfoAPI struct {
|
|||||||
MarketDeals map[marketDealKey]*api.MarketDeal
|
MarketDeals map[marketDealKey]*api.MarketDeal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mapi *mockGetCurrentDealInfoAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) {
|
||||||
|
return &miner.PreCommitChanges{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) {
|
func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) {
|
||||||
deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}]
|
deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -5,16 +5,18 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/lotus/chain/events"
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"golang.org/x/xerrors"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type sectorCommittedEventsAPI interface {
|
type sectorCommittedEventsAPI interface {
|
||||||
@ -32,7 +34,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
|
|||||||
|
|
||||||
// First check if the deal is already active, and if so, bail out
|
// First check if the deal is already active, and if so, bail out
|
||||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||||
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
|
di, isActive, publishTs, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Note: the error returned from here will end up being returned
|
// Note: the error returned from here will end up being returned
|
||||||
// from OnDealSectorPreCommitted so no need to call the callback
|
// from OnDealSectorPreCommitted so no need to call the callback
|
||||||
@ -46,6 +48,36 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
|
|||||||
return true, false, nil
|
return true, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check that precommits which landed between when the deal was published
|
||||||
|
// and now don't already contain the deal we care about.
|
||||||
|
// (this can happen when the precommit lands vary quickly (in tests), or
|
||||||
|
// when the client node was down after the deal was published, and when
|
||||||
|
// the precommit containing it landed on chain)
|
||||||
|
|
||||||
|
if publishTs == types.EmptyTSK {
|
||||||
|
lookup, err := api.StateSearchMsg(ctx, *publishCid)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
if lookup != nil { // can be nil in tests
|
||||||
|
publishTs = lookup.TipSet
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
diff, err := api.diffPreCommits(ctx, provider, publishTs, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, info := range diff.Added {
|
||||||
|
for _, d := range info.Info.DealIDs {
|
||||||
|
if d == di {
|
||||||
|
cb(info.Info.SectorNumber, false, nil)
|
||||||
|
return true, false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Not yet active, start matching against incoming messages
|
// Not yet active, start matching against incoming messages
|
||||||
return false, true, nil
|
return false, true, nil
|
||||||
}
|
}
|
||||||
@ -88,7 +120,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
|
|||||||
|
|
||||||
// When the deal is published, the deal ID may change, so get the
|
// When the deal is published, the deal ID may change, so get the
|
||||||
// current deal ID from the publish message CID
|
// current deal ID from the publish message CID
|
||||||
dealID, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
|
dealID, _, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -130,7 +162,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
|
|||||||
|
|
||||||
// First check if the deal is already active, and if so, bail out
|
// First check if the deal is already active, and if so, bail out
|
||||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||||
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
|
_, isActive, _, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Note: the error returned from here will end up being returned
|
// Note: the error returned from here will end up being returned
|
||||||
// from OnDealSectorCommitted so no need to call the callback
|
// from OnDealSectorCommitted so no need to call the callback
|
||||||
@ -186,7 +218,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get the deal info
|
// Get the deal info
|
||||||
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
|
_, sd, _, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
||||||
}
|
}
|
||||||
@ -216,22 +248,22 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) {
|
func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, bool, types.TipSetKey, error) {
|
||||||
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
|
di, sd, publishTs, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: This may be fine for some errors
|
// TODO: This may be fine for some errors
|
||||||
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
return 0, false, types.EmptyTSK, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sector with deal is already active
|
// Sector with deal is already active
|
||||||
if sd.State.SectorStartEpoch > 0 {
|
if sd.State.SectorStartEpoch > 0 {
|
||||||
return true, nil
|
return 0, true, publishTs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sector was slashed
|
// Sector was slashed
|
||||||
if sd.State.SlashEpoch > 0 {
|
if sd.State.SlashEpoch > 0 {
|
||||||
return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
|
return 0, false, types.EmptyTSK, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return di, false, publishTs, nil
|
||||||
}
|
}
|
||||||
|
@ -161,8 +161,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
|
|||||||
deals: map[abi.DealID]*api.MarketDeal{},
|
deals: map[abi.DealID]*api.MarketDeal{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedCBCallCount: 1,
|
expectedCBCallCount: 0,
|
||||||
expectedCBError: errors.New("handling applied event: something went wrong"),
|
|
||||||
expectedError: errors.New("failed to set up called handler: something went wrong"),
|
expectedError: errors.New("failed to set up called handler: something went wrong"),
|
||||||
},
|
},
|
||||||
"proposed deal epoch timeout": {
|
"proposed deal epoch timeout": {
|
||||||
|
@ -41,6 +41,7 @@ var log = logging.Logger("storageadapter")
|
|||||||
|
|
||||||
type ProviderNodeAdapter struct {
|
type ProviderNodeAdapter struct {
|
||||||
api.FullNode
|
api.FullNode
|
||||||
|
*apiWrapper
|
||||||
|
|
||||||
// this goes away with the data transfer module
|
// this goes away with the data transfer module
|
||||||
dag dtypes.StagingDAG
|
dag dtypes.StagingDAG
|
||||||
@ -56,6 +57,7 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDA
|
|||||||
return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
|
return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
|
||||||
na := &ProviderNodeAdapter{
|
na := &ProviderNodeAdapter{
|
||||||
FullNode: full,
|
FullNode: full,
|
||||||
|
apiWrapper: &apiWrapper{api: full},
|
||||||
|
|
||||||
dag: dag,
|
dag: dag,
|
||||||
secb: secb,
|
secb: secb,
|
||||||
|
Loading…
Reference in New Issue
Block a user