diff --git a/.circleci/config.yml b/.circleci/config.yml index dfaa3fb09..d570e303c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -820,11 +820,6 @@ workflows: suite: itest-deals_concurrent target: "./itests/deals_concurrent_test.go" - - test: - name: test-itest-deals_expiry - suite: itest-deals_expiry - target: "./itests/deals_expiry_test.go" - - test: name: test-itest-deals_offline suite: itest-deals_offline @@ -850,11 +845,6 @@ workflows: suite: itest-deals_publish target: "./itests/deals_publish_test.go" - - test: - name: test-itest-deals_slash - suite: itest-deals_slash - target: "./itests/deals_slash_test.go" - - test: name: test-itest-deals suite: itest-deals diff --git a/chain/events/events_called.go b/chain/events/events_called.go index e783f7800..1f0b80169 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -157,7 +157,7 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { // Apply any queued events and timeouts that were targeted at the // current chain height e.applyWithConfidence(ts, at) - e.applyTimeouts(at) + e.applyTimeouts(ts) } // Update the latest known tipset @@ -273,8 +273,8 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) } // Apply any timeouts that expire at this height -func (e *hcEvents) applyTimeouts(at abi.ChainEpoch) { - triggers, ok := e.timeouts[at] +func (e *hcEvents) applyTimeouts(ts *types.TipSet) { + triggers, ok := e.timeouts[ts.Height()] if !ok { return // nothing to do } @@ -288,14 +288,14 @@ func (e *hcEvents) applyTimeouts(at abi.ChainEpoch) { continue } - timeoutTs, err := e.tsc.get(at - abi.ChainEpoch(trigger.confidence)) + timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence)) if err != nil { - log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", at-abi.ChainEpoch(trigger.confidence), at) + log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height()) } - more, err := trigger.handle(nil, nil, timeoutTs, at) + more, err := trigger.handle(nil, nil, timeoutTs, ts.Height()) if err != nil { - log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), at, err) + log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err) continue // don't revert failed calls } diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 6f73dfe58..04f938055 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1293,88 +1293,81 @@ func TestStateChangedRevert(t *testing.T) { } func TestStateChangedTimeout(t *testing.T) { - timeoutHeight := abi.ChainEpoch(20) - confidence := 3 + fcs := &fakeCS{ + t: t, + h: 1, - testCases := []struct { - name string - checkFn CheckFunc - nilBlocks []int - expectTimeout bool - }{{ - // Verify that the state changed timeout is called at the expected height - name: "state changed timeout", - checkFn: func(ts *types.TipSet) (d bool, m bool, e error) { - return false, true, nil - }, - expectTimeout: true, - }, { - // Verify that the state changed timeout is called even if the timeout - // falls on nil block - name: "state changed timeout falls on nil block", - checkFn: func(ts *types.TipSet) (d bool, m bool, e error) { - return false, true, nil - }, - nilBlocks: []int{20, 21, 22, 23}, - expectTimeout: true, - }, { - // Verify that the state changed timeout is not called if the check - // function reports that it's complete - name: "no timeout callback if check func reports done", - checkFn: func(ts *types.TipSet) (d bool, m bool, e error) { - return true, true, nil - }, - expectTimeout: false, - }} - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - fcs := &fakeCS{ - t: t, - h: 1, - - msgs: map[cid.Cid]fakeMsg{}, - blkMsgs: map[cid.Cid]cid.Cid{}, - tsc: newTSCache(2*build.ForkLengthThreshold, nil), - callNumber: map[string]int{}, - } - require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) - - events := NewEvents(context.Background(), fcs) - - // Track whether the callback was called - called := false - - // Set up state change tracking that will timeout at the given height - err := events.StateChanged( - tc.checkFn, - func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { - // Expect the callback to be called at the timeout height with nil data - called = true - require.Nil(t, data) - require.Equal(t, timeoutHeight, newTs.Height()) - require.Equal(t, timeoutHeight+abi.ChainEpoch(confidence), curH) - return false, nil - }, func(_ context.Context, ts *types.TipSet) error { - t.Fatal("revert on timeout") - return nil - }, confidence, timeoutHeight, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { - return false, nil, nil - }) - - require.NoError(t, err) - - // Advance to timeout height - fcs.advance(0, int(timeoutHeight)+1, nil) - require.False(t, called) - - // Advance past timeout height - fcs.advance(0, 5, nil, tc.nilBlocks...) - require.Equal(t, tc.expectTimeout, called) - called = false - }) + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + callNumber: map[string]int{}, } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + called := false + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + called = true + require.Nil(t, data) + require.Equal(t, abi.ChainEpoch(20), newTs.Height()) + require.Equal(t, abi.ChainEpoch(23), curH) + return false, nil + }, func(_ context.Context, ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + return false, nil, nil + }) + + require.NoError(t, err) + + fcs.advance(0, 21, nil) + require.False(t, called) + + fcs.advance(0, 5, nil) + require.True(t, called) + called = false + + // with check func reporting done + + fcs = &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + callNumber: map[string]int{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events = NewEvents(context.Background(), fcs) + + err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return true, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + called = true + require.Nil(t, data) + require.Equal(t, abi.ChainEpoch(20), newTs.Height()) + require.Equal(t, abi.ChainEpoch(23), curH) + return false, nil + }, func(_ context.Context, ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + return false, nil, nil + }) + require.NoError(t, err) + + fcs.advance(0, 21, nil) + require.False(t, called) + + fcs.advance(0, 5, nil) + require.False(t, called) } func TestCalledMultiplePerEpoch(t *testing.T) { diff --git a/go.mod b/go.mod index 62a96e02c..be7d911e1 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/filecoin-project/go-data-transfer v1.7.6 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 - github.com/filecoin-project/go-fil-markets v1.9.0 + github.com/filecoin-project/go-fil-markets v1.8.1 github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1 diff --git a/go.sum b/go.sum index 507ca4d2d..f16a81a9f 100644 --- a/go.sum +++ b/go.sum @@ -291,8 +291,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.9.0 h1:atoORQmjN1SEjB4RKj3uvPCqL9Jcs2RZ1GHKefstkxw= -github.com/filecoin-project/go-fil-markets v1.9.0/go.mod h1:PIPyOhoDLWT5NcciJQeK6Hes7MIeczGLNWVO/2Vy0a4= +github.com/filecoin-project/go-fil-markets v1.8.1 h1:nNJB5EIp5c6yo/z51DloVaL7T24SslCoxSDOXwNQr9k= +github.com/filecoin-project/go-fil-markets v1.8.1/go.mod h1:PIPyOhoDLWT5NcciJQeK6Hes7MIeczGLNWVO/2Vy0a4= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/itests/deals_expiry_test.go b/itests/deals_expiry_test.go deleted file mode 100644 index b8b3c4b5a..000000000 --- a/itests/deals_expiry_test.go +++ /dev/null @@ -1,140 +0,0 @@ -package itests - -import ( - "context" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-fil-markets/storagemarket" - market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" - market3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/market" - market4 "github.com/filecoin-project/specs-actors/v4/actors/builtin/market" - market5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/market" - - "github.com/filecoin-project/lotus/itests/kit" -) - -// Test that the deal state eventually moves to "Expired" on both client and miner -func TestDealExpiry(t *testing.T) { - kit.QuietMiningLogs() - - resetMinDealDuration(t) - - ctx := context.Background() - - var ( - client kit.TestFullNode - miner1 kit.TestMiner - ) - - ens := kit.NewEnsemble(t, kit.MockProofs()) - ens.FullNode(&client) - ens.Miner(&miner1, &client, kit.WithAllSubsystems()) - bm := ens.Start().InterconnectAll().BeginMining(50 * time.Millisecond) - - dh := kit.NewDealHarness(t, &client, &miner1, &miner1) - - client.WaitTillChain(ctx, kit.HeightAtLeast(5)) - - // Make a deal with a short duration - dealProposalCid, _, _ := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{ - Rseed: 0, - FastRet: true, - // Needs to be far enough in the future to ensure the deal has been sealed - StartEpoch: 3000, - // Short deal duration - MinBlocksDuration: 50, - }) - - // Inject null blocks each time the chain advances by a block so as to - // get to deal expiration faster - go func() { - ch, _ := client.ChainNotify(ctx) - for range ch { - bm[0].InjectNulls(10) - } - }() - - clientExpired := false - minerExpired := false - for { - ts, err := client.ChainHead(ctx) - require.NoError(t, err) - - t.Logf("Chain height: %d", ts.Height()) - - // Get the miner deal from the proposal CID - minerDeal := getMinerDeal(ctx, t, miner1, *dealProposalCid) - - t.Logf("Miner deal:") - t.Logf(" %s -> %s", minerDeal.Proposal.Client, minerDeal.Proposal.Provider) - t.Logf(" StartEpoch: %d", minerDeal.Proposal.StartEpoch) - t.Logf(" EndEpoch: %d", minerDeal.Proposal.EndEpoch) - t.Logf(" State: %s", storagemarket.DealStates[minerDeal.State]) - //spew.Dump(d) - - // Get the client deal - clientDeals, err := client.ClientListDeals(ctx) - require.NoError(t, err) - - t.Logf("Client deal state: %s\n", storagemarket.DealStates[clientDeals[0].State]) - - // Expect the deal to eventually expire on the client and the miner - if clientDeals[0].State == storagemarket.StorageDealExpired { - t.Logf("Client deal expired") - clientExpired = true - } - if minerDeal.State == storagemarket.StorageDealExpired { - t.Logf("Miner deal expired") - minerExpired = true - } - if clientExpired && minerExpired { - t.Logf("PASS: Client and miner deal expired") - return - } - - if ts.Height() > 5000 { - t.Fatalf("Reached height %d without client and miner deals expiring", ts.Height()) - } - - time.Sleep(2 * time.Second) - } -} - -func getMinerDeal(ctx context.Context, t *testing.T, miner1 kit.TestMiner, dealProposalCid cid.Cid) storagemarket.MinerDeal { - minerDeals, err := miner1.MarketListIncompleteDeals(ctx) - require.NoError(t, err) - require.Greater(t, len(minerDeals), 0) - - for _, d := range minerDeals { - if d.ProposalCid == dealProposalCid { - return d - } - } - t.Fatalf("miner deal with proposal CID %s not found", dealProposalCid) - return storagemarket.MinerDeal{} -} - -// reset minimum deal duration to 0, so we can make very short-lived deals. -// NOTE: this will need updating with every new specs-actors version. -func resetMinDealDuration(t *testing.T) { - m2 := market2.DealMinDuration - m3 := market3.DealMinDuration - m4 := market4.DealMinDuration - m5 := market5.DealMinDuration - - market2.DealMinDuration = 0 - market3.DealMinDuration = 0 - market4.DealMinDuration = 0 - market5.DealMinDuration = 0 - - t.Cleanup(func() { - market2.DealMinDuration = m2 - market3.DealMinDuration = m3 - market4.DealMinDuration = m4 - market5.DealMinDuration = m5 - }) -} diff --git a/itests/deals_slash_test.go b/itests/deals_slash_test.go deleted file mode 100644 index 929b69754..000000000 --- a/itests/deals_slash_test.go +++ /dev/null @@ -1,166 +0,0 @@ -package itests - -import ( - "context" - "testing" - "time" - - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-fil-markets/storagemarket" - miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" - "github.com/filecoin-project/lotus/itests/kit" - "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/modules/dtypes" -) - -// Test that when a miner terminates a sector containing a deal, the deal state -// eventually moves to "Slashed" on both client and miner -func TestDealSlashing(t *testing.T) { - kit.QuietMiningLogs() - _ = logging.SetLogLevel("sectors", "debug") - - ctx := context.Background() - - var ( - client kit.TestFullNode - miner1 kit.TestMiner - ) - - // Set up sealing config so that there is no batching of terminate actions - var sealingCfgFn dtypes.GetSealingConfigFunc = func() (sealiface.Config, error) { - return sealiface.Config{ - MaxWaitDealsSectors: 2, - MaxSealingSectors: 0, - MaxSealingSectorsForDeals: 0, - WaitDealsDelay: time.Second, - AlwaysKeepUnsealedCopy: true, - - BatchPreCommits: true, - MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize, - PreCommitBatchWait: time.Second, - PreCommitBatchSlack: time.Second, - - AggregateCommits: true, - MinCommitBatch: 1, - MaxCommitBatch: 1, - CommitBatchWait: time.Second, - CommitBatchSlack: time.Second, - - AggregateAboveBaseFee: types.BigMul(types.PicoFil, types.NewInt(150)), // 0.15 nFIL - - TerminateBatchMin: 1, - TerminateBatchMax: 1, - TerminateBatchWait: time.Second, - }, nil - } - fn := func() dtypes.GetSealingConfigFunc { return sealingCfgFn } - sealingCfg := kit.ConstructorOpts(node.Override(new(dtypes.GetSealingConfigFunc), fn)) - - // Set up a client and miner - ens := kit.NewEnsemble(t, kit.MockProofs()) - ens.FullNode(&client) - ens.Miner(&miner1, &client, kit.WithAllSubsystems(), sealingCfg) - ens.Start().InterconnectAll().BeginMining(50 * time.Millisecond) - - dh := kit.NewDealHarness(t, &client, &miner1, &miner1) - - client.WaitTillChain(ctx, kit.HeightAtLeast(5)) - - // Make a storage deal - dealProposalCid, _, _ := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{ - Rseed: 0, - FastRet: true, - }) - - // Get the miner deal from the proposal CID - minerDeal := getDealByProposalCid(ctx, t, miner1, *dealProposalCid) - - // Terminate the sector containing the deal - t.Logf("Terminating sector %d containing deal %s", minerDeal.SectorNumber, dealProposalCid) - err := miner1.SectorTerminate(ctx, minerDeal.SectorNumber) - require.NoError(t, err) - - clientExpired := false - minerExpired := false - for { - ts, err := client.ChainHead(ctx) - require.NoError(t, err) - - t.Logf("Chain height: %d", ts.Height()) - - // Get the miner deal from the proposal CID - minerDeal := getDealByProposalCid(ctx, t, miner1, *dealProposalCid) - // Get the miner state from the piece CID - mktDeal := getMarketDeal(ctx, t, miner1, minerDeal.Proposal.PieceCID) - - t.Logf("Miner deal:") - t.Logf(" %s -> %s", minerDeal.Proposal.Client, minerDeal.Proposal.Provider) - t.Logf(" StartEpoch: %d", minerDeal.Proposal.StartEpoch) - t.Logf(" EndEpoch: %d", minerDeal.Proposal.EndEpoch) - t.Logf(" SlashEpoch: %d", mktDeal.State.SlashEpoch) - t.Logf(" LastUpdatedEpoch: %d", mktDeal.State.LastUpdatedEpoch) - t.Logf(" State: %s", storagemarket.DealStates[minerDeal.State]) - //spew.Dump(d) - - // Get the client deal - clientDeals, err := client.ClientListDeals(ctx) - require.NoError(t, err) - - t.Logf("Client deal state: %s\n", storagemarket.DealStates[clientDeals[0].State]) - - // Expect the deal to eventually be slashed on the client and the miner - if clientDeals[0].State == storagemarket.StorageDealSlashed { - t.Logf("Client deal slashed") - clientExpired = true - } - if minerDeal.State == storagemarket.StorageDealSlashed { - t.Logf("Miner deal slashed") - minerExpired = true - } - if clientExpired && minerExpired { - t.Logf("PASS: Client and miner deal slashed") - return - } - - if ts.Height() > 4000 { - t.Fatalf("Reached height %d without client and miner deals being slashed", ts.Height()) - } - - time.Sleep(2 * time.Second) - } -} - -func getMarketDeal(ctx context.Context, t *testing.T, miner1 kit.TestMiner, pieceCid cid.Cid) api.MarketDeal { - mktDeals, err := miner1.MarketListDeals(ctx) - require.NoError(t, err) - require.Greater(t, len(mktDeals), 0) - - for _, d := range mktDeals { - if d.Proposal.PieceCID == pieceCid { - return d - } - } - t.Fatalf("miner deal with piece CID %s not found", pieceCid) - return api.MarketDeal{} -} - -func getDealByProposalCid(ctx context.Context, t *testing.T, miner1 kit.TestMiner, dealProposalCid cid.Cid) storagemarket.MinerDeal { - minerDeals, err := miner1.MarketListIncompleteDeals(ctx) - require.NoError(t, err) - require.Greater(t, len(minerDeals), 0) - - for _, d := range minerDeals { - if d.ProposalCid == dealProposalCid { - return d - } - } - t.Fatalf("miner deal with proposal CID %s not found", dealProposalCid) - return storagemarket.MinerDeal{} -} diff --git a/itests/kit/deals.go b/itests/kit/deals.go index 804c18165..7d78d8c02 100644 --- a/itests/kit/deals.go +++ b/itests/kit/deals.go @@ -36,7 +36,6 @@ type MakeFullDealParams struct { Rseed int FastRet bool StartEpoch abi.ChainEpoch - MinBlocksDuration uint64 UseCARFileForStorageDeal bool // SuspendUntilCryptoeconStable suspends deal-making, until cryptoecon @@ -98,9 +97,6 @@ func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, params MakeFullDealPa dp.Data.Root = res.Root dp.DealStartEpoch = params.StartEpoch dp.FastRetrieval = params.FastRet - if params.MinBlocksDuration > 0 { - dp.MinBlocksDuration = params.MinBlocksDuration - } deal = dh.StartDeal(ctx, dp) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index 9b375cf08..80ead2be3 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -18,7 +18,7 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" - market0 "github.com/filecoin-project/specs-actors/actors/builtin/market" + miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" @@ -38,10 +38,10 @@ import ( type ClientNodeAdapter struct { *clientApi - fundmgr *market.FundManager - ev *events.Events - scMgr *SectorCommittedManager - deMgr *DealExpiryManager + fundmgr *market.FundManager + ev *events.Events + dsMatcher *dealStateMatcher + scMgr *SectorCommittedManager } type clientApi struct { @@ -58,12 +58,11 @@ func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi ful a := &ClientNodeAdapter{ clientApi: capi, - fundmgr: fundmgr, - ev: ev, + fundmgr: fundmgr, + ev: ev, + dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi))), } a.scMgr = NewSectorCommittedManager(ev, a, &apiWrapper{api: capi}) - dsMatcher := newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi))) - a.deMgr = NewDealExpiryManager(ev, capi, capi, dsMatcher) return a } @@ -250,8 +249,94 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider return c.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, marketactor.DealProposal(proposal), *publishCid, cb) } -func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, publishCid cid.Cid, proposal market0.DealProposal, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error { - return c.deMgr.OnDealExpiredOrSlashed(ctx, publishCid, proposal, onDealExpired, onDealSlashed) +// TODO: Replace dealID parameter with DealProposal +func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error { + head, err := c.ChainHead(ctx) + if err != nil { + return xerrors.Errorf("client: failed to get chain head: %w", err) + } + + sd, err := c.StateMarketStorageDeal(ctx, dealID, head.Key()) + if err != nil { + return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err) + } + + // Called immediately to check if the deal has already expired or been slashed + checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { + if ts == nil { + // keep listening for events + return false, true, nil + } + + // Check if the deal has already expired + if sd.Proposal.EndEpoch <= ts.Height() { + onDealExpired(nil) + return true, false, nil + } + + // If there is no deal assume it's already been slashed + if sd.State.SectorStartEpoch < 0 { + onDealSlashed(ts.Height(), nil) + return true, false, nil + } + + // No events have occurred yet, so return + // done: false, more: true (keep listening for events) + return false, true, nil + } + + // Called when there was a match against the state change we're looking for + // and the chain has advanced to the confidence height + stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { + // Check if the deal has already expired + if ts2 == nil || sd.Proposal.EndEpoch <= ts2.Height() { + onDealExpired(nil) + return false, nil + } + + // Timeout waiting for state change + if states == nil { + log.Error("timed out waiting for deal expiry") + return false, nil + } + + changedDeals, ok := states.(state.ChangedDeals) + if !ok { + panic("Expected state.ChangedDeals") + } + + deal, ok := changedDeals[dealID] + if !ok { + // No change to deal + return true, nil + } + + // Deal was slashed + if deal.To == nil { + onDealSlashed(ts2.Height(), nil) + return false, nil + } + + return true, nil + } + + // Called when there was a chain reorg and the state change was reverted + revert := func(ctx context.Context, ts *types.TipSet) error { + // TODO: Is it ok to just ignore this? + log.Warn("deal state reverted; TODO: actually handle this!") + return nil + } + + // Watch for state changes to the deal + match := c.dsMatcher.matcher(ctx, dealID) + + // Wait until after the end epoch for the deal and then timeout + timeout := (sd.Proposal.EndEpoch - head.Height()) + 1 + if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil { + return xerrors.Errorf("failed to set up state changed handler: %w", err) + } + + return nil } func (c *ClientNodeAdapter) SignProposal(ctx context.Context, signer address.Address, proposal market2.DealProposal) (*market2.ClientDealProposal, error) { diff --git a/markets/storageadapter/ondealexpired.go b/markets/storageadapter/ondealexpired.go deleted file mode 100644 index 6767c6d34..000000000 --- a/markets/storageadapter/ondealexpired.go +++ /dev/null @@ -1,152 +0,0 @@ -package storageadapter - -import ( - "context" - - market0 "github.com/filecoin-project/specs-actors/actors/builtin/market" - - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors/builtin/market" - "github.com/filecoin-project/lotus/chain/events" - "github.com/filecoin-project/lotus/chain/events/state" - "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-state-types/abi" - sealing "github.com/filecoin-project/lotus/extern/storage-sealing" -) - -type demEventsAPI interface { - Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error - StateChanged(check events.CheckFunc, scHnd events.StateChangeHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.StateMatchFunc) error -} - -type demChainAPI interface { - ChainHead(context.Context) (*types.TipSet, error) -} - -type DealExpiryManagerAPI interface { - demEventsAPI - demChainAPI - GetCurrentDealInfo(ctx context.Context, tok sealing.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, error) -} - -type dealExpiryManagerAdapter struct { - demEventsAPI - demChainAPI - *sealing.CurrentDealInfoManager -} - -type DealExpiryManager struct { - demAPI DealExpiryManagerAPI - dsMatcher *dealStateMatcher -} - -func NewDealExpiryManager(ev demEventsAPI, ch demChainAPI, tskAPI sealing.CurrentDealInfoTskAPI, dsMatcher *dealStateMatcher) *DealExpiryManager { - dim := &sealing.CurrentDealInfoManager{ - CDAPI: &sealing.CurrentDealInfoAPIAdapter{CurrentDealInfoTskAPI: tskAPI}, - } - - adapter := &dealExpiryManagerAdapter{ - demEventsAPI: ev, - demChainAPI: ch, - CurrentDealInfoManager: dim, - } - return newDealExpiryManager(adapter, dsMatcher) -} - -func newDealExpiryManager(demAPI DealExpiryManagerAPI, dsMatcher *dealStateMatcher) *DealExpiryManager { - return &DealExpiryManager{demAPI: demAPI, dsMatcher: dsMatcher} -} - -func (mgr *DealExpiryManager) OnDealExpiredOrSlashed(ctx context.Context, publishCid cid.Cid, proposal market0.DealProposal, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error { - head, err := mgr.demAPI.ChainHead(ctx) - if err != nil { - return xerrors.Errorf("client: failed to get chain head: %w", err) - } - - prop := market.DealProposal(proposal) - res, err := mgr.demAPI.GetCurrentDealInfo(ctx, head.Key().Bytes(), &prop, publishCid) - if err != nil { - return xerrors.Errorf("awaiting deal expired: getting deal info: %w", err) - } - - // Called immediately to check if the deal has already expired or been slashed - checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - if ts == nil { - // keep listening for events - return false, true, nil - } - - // Check if the deal has already expired - if res.MarketDeal.Proposal.EndEpoch <= ts.Height() { - onDealExpired(nil) - return true, false, nil - } - - // If there is no deal assume it's already been slashed - if res.MarketDeal.State.SectorStartEpoch < 0 { - onDealSlashed(ts.Height(), nil) - return true, false, nil - } - - // No events have occurred yet, so return - // done: false, more: true (keep listening for events) - return false, true, nil - } - - // Called when there was a match against the state change we're looking for - // and the chain has advanced to the confidence height - stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { - // Check if the deal has already expired - if res.MarketDeal.Proposal.EndEpoch <= h { - onDealExpired(nil) - return false, nil - } - - // Timeout waiting for state change - if states == nil { - log.Errorf("timed out waiting for deal expiry for deal with piece CID %s", proposal.PieceCID) - return false, nil - } - - changedDeals, ok := states.(state.ChangedDeals) - if !ok { - return false, xerrors.Errorf("OnDealExpireOrSlashed stateChanged: Expected state.ChangedDeals") - } - - deal, ok := changedDeals[res.DealID] - if !ok { - // No change to deal - return true, nil - } - - // Deal was slashed - if deal.To == nil || deal.To.SlashEpoch > 0 { - onDealSlashed(ts2.Height(), nil) - return false, nil - } - - return true, nil - } - - // Called when there was a chain reorg and the state change was reverted - revert := func(ctx context.Context, ts *types.TipSet) error { - // TODO: Is it ok to just ignore this? - log.Warn("deal state reverted; TODO: actually handle this!") - return nil - } - - // Watch for state changes to the deal - match := mgr.dsMatcher.matcher(ctx, res.DealID) - - // Wait until after the end epoch for the deal and then timeout - timeout := res.MarketDeal.Proposal.EndEpoch + 1 - if err := mgr.demAPI.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil { - return xerrors.Errorf("failed to set up state changed handler: %w", err) - } - - return nil -} diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index d3d442623..23a3c32a8 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -18,8 +18,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" - "github.com/filecoin-project/lotus/chain/events/state" - market0 "github.com/filecoin-project/specs-actors/actors/builtin/market" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/filecoin-project/lotus/api" @@ -28,6 +26,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/lib/sigs" @@ -52,8 +51,8 @@ type ProviderNodeAdapter struct { addBalanceSpec *api.MessageSendSpec maxDealCollateralMultiplier uint64 + dsMatcher *dealStateMatcher scMgr *SectorCommittedManager - deMgr *DealExpiryManager } func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode { @@ -67,6 +66,7 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConf secb: secb, ev: ev, dealPublisher: dealPublisher, + dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))), } if fc != nil { na.addBalanceSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxMarketBalanceAddFee)} @@ -77,9 +77,6 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConf } na.scMgr = NewSectorCommittedManager(ev, na, &apiWrapper{api: full}) - dsMatcher := newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))) - na.deMgr = NewDealExpiryManager(ev, full, full, dsMatcher) - return na } } @@ -331,8 +328,93 @@ func (n *ProviderNodeAdapter) GetDataCap(ctx context.Context, addr address.Addre return sp, err } -func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, publishCid cid.Cid, proposal market0.DealProposal, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error { - return n.deMgr.OnDealExpiredOrSlashed(ctx, publishCid, proposal, onDealExpired, onDealSlashed) +func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error { + head, err := n.ChainHead(ctx) + if err != nil { + return xerrors.Errorf("client: failed to get chain head: %w", err) + } + + sd, err := n.StateMarketStorageDeal(ctx, dealID, head.Key()) + if err != nil { + return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err) + } + + // Called immediately to check if the deal has already expired or been slashed + checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { + if ts == nil { + // keep listening for events + return false, true, nil + } + + // Check if the deal has already expired + if sd.Proposal.EndEpoch <= ts.Height() { + onDealExpired(nil) + return true, false, nil + } + + // If there is no deal assume it's already been slashed + if sd.State.SectorStartEpoch < 0 { + onDealSlashed(ts.Height(), nil) + return true, false, nil + } + + // No events have occurred yet, so return + // done: false, more: true (keep listening for events) + return false, true, nil + } + + // Called when there was a match against the state change we're looking for + // and the chain has advanced to the confidence height + stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { + // Check if the deal has already expired + if ts2 == nil || sd.Proposal.EndEpoch <= ts2.Height() { + onDealExpired(nil) + return false, nil + } + + // Timeout waiting for state change + if states == nil { + log.Error("timed out waiting for deal expiry") + return false, nil + } + + changedDeals, ok := states.(state.ChangedDeals) + if !ok { + panic("Expected state.ChangedDeals") + } + + deal, ok := changedDeals[dealID] + if !ok { + // No change to deal + return true, nil + } + + // Deal was slashed + if deal.To == nil { + onDealSlashed(ts2.Height(), nil) + return false, nil + } + + return true, nil + } + + // Called when there was a chain reorg and the state change was reverted + revert := func(ctx context.Context, ts *types.TipSet) error { + // TODO: Is it ok to just ignore this? + log.Warn("deal state reverted; TODO: actually handle this!") + return nil + } + + // Watch for state changes to the deal + match := n.dsMatcher.matcher(ctx, dealID) + + // Wait until after the end epoch for the deal and then timeout + timeout := (sd.Proposal.EndEpoch - head.Height()) + 1 + if err := n.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil { + return xerrors.Errorf("failed to set up state changed handler: %w", err) + } + + return nil } var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 13e751be4..075eed99d 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -525,7 +525,7 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside earliest := abi.ChainEpoch(sealEpochs) + ht if deal.Proposal.StartEpoch < earliest { log.Warnw("proposed deal would start before sealing can be completed; rejecting storage deal proposal from client", "piece_cid", deal.Proposal.PieceCID, "client", deal.Client.String(), "seal_duration", sealDuration, "earliest", earliest, "curepoch", ht) - return false, fmt.Sprintf("proposed deal start epoch %s too early, cannot seal a sector before %s", deal.Proposal.StartEpoch, earliest), nil + return false, fmt.Sprintf("cannot seal a sector before %s", deal.Proposal.StartEpoch), nil } sd, err := startDelay()