diff --git a/markets/loggers/loggers.go b/markets/loggers/loggers.go index 87c8dfe65..e5f669f2f 100644 --- a/markets/loggers/loggers.go +++ b/markets/loggers/loggers.go @@ -12,22 +12,22 @@ var log = logging.Logger("markets") // StorageClientLogger logs events from the storage client func StorageClientLogger(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { - log.Infow("storage event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) + log.Infow("storage client event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) } // StorageProviderLogger logs events from the storage provider func StorageProviderLogger(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { - log.Infow("storage event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) + log.Infow("storage provider event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) } // RetrievalClientLogger logs events from the retrieval client func RetrievalClientLogger(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) { - log.Infow("retrieval event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) + log.Infow("retrieval client event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) } // RetrievalProviderLogger logs events from the retrieval provider func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) { - log.Infow("retrieval event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) + log.Infow("retrieval provider event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) } // DataTransferLogger logs events from the data transfer module diff --git a/markets/storageadapter/getcurrentdealinfo.go b/markets/storageadapter/getcurrentdealinfo.go index ab8c3f52f..5a8047207 100644 --- a/markets/storageadapter/getcurrentdealinfo.go +++ b/markets/storageadapter/getcurrentdealinfo.go @@ -18,47 +18,51 @@ type getCurrentDealInfoAPI interface { StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) + + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) + ChainReadObj(context.Context, cid.Cid) ([]byte, error) + ChainHasObj(context.Context, cid.Cid) (bool, error) } // GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed -func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, error) { +func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, types.TipSetKey, error) { marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key()) if dealErr == nil { equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) if err != nil { - return dealID, nil, err + return dealID, nil, types.EmptyTSK, err } if equal { - return dealID, marketDeal, nil + return dealID, marketDeal, types.EmptyTSK, nil } dealErr = xerrors.Errorf("Deal proposals did not match") } if publishCid == nil { - return dealID, nil, dealErr + return dealID, nil, types.EmptyTSK, dealErr } // attempt deal id correction lookup, err := api.StateSearchMsg(ctx, *publishCid) if err != nil { - return dealID, nil, err + return dealID, nil, types.EmptyTSK, err } if lookup.Receipt.ExitCode != exitcode.Ok { - return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode) + return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode) } var retval market.PublishStorageDealsReturn if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil { - return dealID, nil, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) + return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) } if len(retval.IDs) != 1 { // market currently only ever sends messages with 1 deal - return dealID, nil, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") + return dealID, nil, types.EmptyTSK, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") } if retval.IDs[0] == dealID { // DealID did not change, so we are stuck with the original lookup error - return dealID, nil, dealErr + return dealID, nil, lookup.TipSet, dealErr } dealID = retval.IDs[0] @@ -67,13 +71,13 @@ func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDea if err == nil { equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) if err != nil { - return dealID, nil, err + return dealID, nil, types.EmptyTSK, err } if !equal { - return dealID, nil, xerrors.Errorf("Deal proposals did not match") + return dealID, nil, types.EmptyTSK, xerrors.Errorf("Deal proposals did not match") } } - return dealID, marketDeal, err + return dealID, marketDeal, lookup.TipSet, err } func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) { diff --git a/markets/storageadapter/getcurrentdealinfo_test.go b/markets/storageadapter/getcurrentdealinfo_test.go index ed5d36c5b..46aaff2c3 100644 --- a/markets/storageadapter/getcurrentdealinfo_test.go +++ b/markets/storageadapter/getcurrentdealinfo_test.go @@ -209,7 +209,7 @@ func TestGetCurrentDealInfo(t *testing.T) { MarketDeals: marketDeals, } - dealID, marketDeal, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid) + dealID, marketDeal, _, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid) require.Equal(t, data.expectedDealID, dealID) require.Equal(t, data.expectedMarketDeal, marketDeal) if data.expectedError == nil { @@ -236,6 +236,18 @@ type mockGetCurrentDealInfoAPI struct { MarketDeals map[marketDealKey]*api.MarketDeal } +func (mapi *mockGetCurrentDealInfoAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { + panic("implement me") +} + +func (mapi *mockGetCurrentDealInfoAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { + panic("implement me") +} + +func (mapi *mockGetCurrentDealInfoAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { + panic("implement me") +} + func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) { deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}] if !ok { diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index bd59da750..7dd0a932f 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -5,16 +5,21 @@ import ( "context" "sync" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" - "golang.org/x/xerrors" ) type sectorCommittedEventsAPI interface { @@ -32,7 +37,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + di, isActive, publishTs, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorPreCommitted so no need to call the callback @@ -46,6 +51,52 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return true, false, nil } + if publishTs == types.EmptyTSK { + lookup, err := api.StateSearchMsg(ctx, *publishCid) + if err != nil { + return false, false, err + } + if lookup != nil { // can be nil in tests + publishTs = lookup.TipSet + } + } + + store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(api))) + + publishAct, err := api.StateGetActor(ctx, provider, publishTs) + if err != nil { + return false, false, xerrors.Errorf("getting provider actor: %w", err) + } + + curAct, err := api.StateGetActor(ctx, provider, ts.Key()) + if err != nil { + return false, false, xerrors.Errorf("getting provider actor: %w", err) + } + + curSt, err := miner.Load(store, curAct) + if err != nil { + return false, false, xerrors.Errorf("leading miner actor: %w", err) + } + + pubSt, err := miner.Load(store, publishAct) + if err != nil { + return false, false, xerrors.Errorf("leading miner actor: %w", err) + } + + diff, err := miner.DiffPreCommits(pubSt, curSt) + if err != nil { + return false, false, xerrors.Errorf("diff precommits: %w", err) + } + + for _, info := range diff.Added { + for _, d := range info.Info.DealIDs { + if d == di { + cb(info.Info.SectorNumber, false, nil) + return true, false, nil + } + } + } + // Not yet active, start matching against incoming messages return false, true, nil } @@ -88,7 +139,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // When the deal is published, the deal ID may change, so get the // current deal ID from the publish message CID - dealID, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + dealID, _, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { return false, err } @@ -130,7 +181,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + _, isActive, _, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorCommitted so no need to call the callback @@ -186,7 +237,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } // Get the deal info - _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + _, sd, _, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { return false, xerrors.Errorf("failed to look up deal on chain: %w", err) } @@ -216,22 +267,22 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return nil } -func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) { - _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) +func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, bool, types.TipSetKey, error) { + di, sd, publishTs, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { // TODO: This may be fine for some errors - return false, xerrors.Errorf("failed to look up deal on chain: %w", err) + return 0, false, types.EmptyTSK, xerrors.Errorf("failed to look up deal on chain: %w", err) } // Sector with deal is already active if sd.State.SectorStartEpoch > 0 { - return true, nil + return 0, true, publishTs, nil } // Sector was slashed if sd.State.SlashEpoch > 0 { - return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) + return 0, false, types.EmptyTSK, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) } - return false, nil + return di, false, publishTs, nil }