storageadapter: Look at precommits on-chain since deal publish msg

This commit is contained in:
Łukasz Magiera 2021-01-21 11:54:32 +01:00
parent e5efe57c90
commit 42b481fb61
4 changed files with 96 additions and 29 deletions

View File

@ -12,22 +12,22 @@ var log = logging.Logger("markets")
// StorageClientLogger logs events from the storage client
func StorageClientLogger(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
log.Infow("storage event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
log.Infow("storage client event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
}
// StorageProviderLogger logs events from the storage provider
func StorageProviderLogger(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
log.Infow("storage event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
log.Infow("storage provider event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
}
// RetrievalClientLogger logs events from the retrieval client
func RetrievalClientLogger(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
log.Infow("retrieval event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
log.Infow("retrieval client event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
}
// RetrievalProviderLogger logs events from the retrieval provider
func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
log.Infow("retrieval event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
log.Infow("retrieval provider event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
}
// DataTransferLogger logs events from the data transfer module

View File

@ -18,47 +18,51 @@ type getCurrentDealInfoAPI interface {
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
ChainHasObj(context.Context, cid.Cid) (bool, error)
}
// GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed
func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, error) {
func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, types.TipSetKey, error) {
marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key())
if dealErr == nil {
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
if err != nil {
return dealID, nil, err
return dealID, nil, types.EmptyTSK, err
}
if equal {
return dealID, marketDeal, nil
return dealID, marketDeal, types.EmptyTSK, nil
}
dealErr = xerrors.Errorf("Deal proposals did not match")
}
if publishCid == nil {
return dealID, nil, dealErr
return dealID, nil, types.EmptyTSK, dealErr
}
// attempt deal id correction
lookup, err := api.StateSearchMsg(ctx, *publishCid)
if err != nil {
return dealID, nil, err
return dealID, nil, types.EmptyTSK, err
}
if lookup.Receipt.ExitCode != exitcode.Ok {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode)
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode)
}
var retval market.PublishStorageDealsReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
return dealID, nil, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
}
if len(retval.IDs) != 1 {
// market currently only ever sends messages with 1 deal
return dealID, nil, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
return dealID, nil, types.EmptyTSK, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
}
if retval.IDs[0] == dealID {
// DealID did not change, so we are stuck with the original lookup error
return dealID, nil, dealErr
return dealID, nil, lookup.TipSet, dealErr
}
dealID = retval.IDs[0]
@ -67,13 +71,13 @@ func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDea
if err == nil {
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
if err != nil {
return dealID, nil, err
return dealID, nil, types.EmptyTSK, err
}
if !equal {
return dealID, nil, xerrors.Errorf("Deal proposals did not match")
return dealID, nil, types.EmptyTSK, xerrors.Errorf("Deal proposals did not match")
}
}
return dealID, marketDeal, err
return dealID, marketDeal, lookup.TipSet, err
}
func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) {

View File

@ -209,7 +209,7 @@ func TestGetCurrentDealInfo(t *testing.T) {
MarketDeals: marketDeals,
}
dealID, marketDeal, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid)
dealID, marketDeal, _, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid)
require.Equal(t, data.expectedDealID, dealID)
require.Equal(t, data.expectedMarketDeal, marketDeal)
if data.expectedError == nil {
@ -236,6 +236,18 @@ type mockGetCurrentDealInfoAPI struct {
MarketDeals map[marketDealKey]*api.MarketDeal
}
func (mapi *mockGetCurrentDealInfoAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
panic("implement me")
}
func (mapi *mockGetCurrentDealInfoAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) {
panic("implement me")
}
func (mapi *mockGetCurrentDealInfoAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) {
panic("implement me")
}
func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) {
deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}]
if !ok {

View File

@ -5,16 +5,21 @@ import (
"context"
"sync"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)
type sectorCommittedEventsAPI interface {
@ -32,7 +37,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
di, isActive, publishTs, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
// from OnDealSectorPreCommitted so no need to call the callback
@ -46,6 +51,52 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return true, false, nil
}
if publishTs == types.EmptyTSK {
lookup, err := api.StateSearchMsg(ctx, *publishCid)
if err != nil {
return false, false, err
}
if lookup != nil { // can be nil in tests
publishTs = lookup.TipSet
}
}
store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(api)))
publishAct, err := api.StateGetActor(ctx, provider, publishTs)
if err != nil {
return false, false, xerrors.Errorf("getting provider actor: %w", err)
}
curAct, err := api.StateGetActor(ctx, provider, ts.Key())
if err != nil {
return false, false, xerrors.Errorf("getting provider actor: %w", err)
}
curSt, err := miner.Load(store, curAct)
if err != nil {
return false, false, xerrors.Errorf("leading miner actor: %w", err)
}
pubSt, err := miner.Load(store, publishAct)
if err != nil {
return false, false, xerrors.Errorf("leading miner actor: %w", err)
}
diff, err := miner.DiffPreCommits(pubSt, curSt)
if err != nil {
return false, false, xerrors.Errorf("diff precommits: %w", err)
}
for _, info := range diff.Added {
for _, d := range info.Info.DealIDs {
if d == di {
cb(info.Info.SectorNumber, false, nil)
return true, false, nil
}
}
}
// Not yet active, start matching against incoming messages
return false, true, nil
}
@ -88,7 +139,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
// When the deal is published, the deal ID may change, so get the
// current deal ID from the publish message CID
dealID, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
dealID, _, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
return false, err
}
@ -130,7 +181,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
_, isActive, _, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
// from OnDealSectorCommitted so no need to call the callback
@ -186,7 +237,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
}
// Get the deal info
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
_, sd, _, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
@ -216,22 +267,22 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return nil
}
func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) {
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, bool, types.TipSetKey, error) {
di, sd, publishTs, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
// TODO: This may be fine for some errors
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
return 0, false, types.EmptyTSK, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
// Sector with deal is already active
if sd.State.SectorStartEpoch > 0 {
return true, nil
return 0, true, publishTs, nil
}
// Sector was slashed
if sd.State.SlashEpoch > 0 {
return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
return 0, false, types.EmptyTSK, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
}
return false, nil
return di, false, publishTs, nil
}