Merge pull request #7220 from filecoin-project/revert/on-deal-exp-changes
revert changes to OnDealExpiredOrChanged in #5431 #7201
This commit is contained in:
commit
14bfd00c0e
@ -820,11 +820,6 @@ workflows:
|
|||||||
suite: itest-deals_concurrent
|
suite: itest-deals_concurrent
|
||||||
target: "./itests/deals_concurrent_test.go"
|
target: "./itests/deals_concurrent_test.go"
|
||||||
|
|
||||||
- test:
|
|
||||||
name: test-itest-deals_expiry
|
|
||||||
suite: itest-deals_expiry
|
|
||||||
target: "./itests/deals_expiry_test.go"
|
|
||||||
|
|
||||||
- test:
|
- test:
|
||||||
name: test-itest-deals_offline
|
name: test-itest-deals_offline
|
||||||
suite: itest-deals_offline
|
suite: itest-deals_offline
|
||||||
@ -850,11 +845,6 @@ workflows:
|
|||||||
suite: itest-deals_publish
|
suite: itest-deals_publish
|
||||||
target: "./itests/deals_publish_test.go"
|
target: "./itests/deals_publish_test.go"
|
||||||
|
|
||||||
- test:
|
|
||||||
name: test-itest-deals_slash
|
|
||||||
suite: itest-deals_slash
|
|
||||||
target: "./itests/deals_slash_test.go"
|
|
||||||
|
|
||||||
- test:
|
- test:
|
||||||
name: test-itest-deals
|
name: test-itest-deals
|
||||||
suite: itest-deals
|
suite: itest-deals
|
||||||
|
@ -157,7 +157,7 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
|
|||||||
// Apply any queued events and timeouts that were targeted at the
|
// Apply any queued events and timeouts that were targeted at the
|
||||||
// current chain height
|
// current chain height
|
||||||
e.applyWithConfidence(ts, at)
|
e.applyWithConfidence(ts, at)
|
||||||
e.applyTimeouts(at)
|
e.applyTimeouts(ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the latest known tipset
|
// Update the latest known tipset
|
||||||
@ -273,8 +273,8 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Apply any timeouts that expire at this height
|
// Apply any timeouts that expire at this height
|
||||||
func (e *hcEvents) applyTimeouts(at abi.ChainEpoch) {
|
func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
|
||||||
triggers, ok := e.timeouts[at]
|
triggers, ok := e.timeouts[ts.Height()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return // nothing to do
|
return // nothing to do
|
||||||
}
|
}
|
||||||
@ -288,14 +288,14 @@ func (e *hcEvents) applyTimeouts(at abi.ChainEpoch) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
timeoutTs, err := e.tsc.get(at - abi.ChainEpoch(trigger.confidence))
|
timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", at-abi.ChainEpoch(trigger.confidence), at)
|
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
|
||||||
}
|
}
|
||||||
|
|
||||||
more, err := trigger.handle(nil, nil, timeoutTs, at)
|
more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), at, err)
|
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
|
||||||
continue // don't revert failed calls
|
continue // don't revert failed calls
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1293,43 +1293,6 @@ func TestStateChangedRevert(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStateChangedTimeout(t *testing.T) {
|
func TestStateChangedTimeout(t *testing.T) {
|
||||||
timeoutHeight := abi.ChainEpoch(20)
|
|
||||||
confidence := 3
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
checkFn CheckFunc
|
|
||||||
nilBlocks []int
|
|
||||||
expectTimeout bool
|
|
||||||
}{{
|
|
||||||
// Verify that the state changed timeout is called at the expected height
|
|
||||||
name: "state changed timeout",
|
|
||||||
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
|
|
||||||
return false, true, nil
|
|
||||||
},
|
|
||||||
expectTimeout: true,
|
|
||||||
}, {
|
|
||||||
// Verify that the state changed timeout is called even if the timeout
|
|
||||||
// falls on nil block
|
|
||||||
name: "state changed timeout falls on nil block",
|
|
||||||
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
|
|
||||||
return false, true, nil
|
|
||||||
},
|
|
||||||
nilBlocks: []int{20, 21, 22, 23},
|
|
||||||
expectTimeout: true,
|
|
||||||
}, {
|
|
||||||
// Verify that the state changed timeout is not called if the check
|
|
||||||
// function reports that it's complete
|
|
||||||
name: "no timeout callback if check func reports done",
|
|
||||||
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
|
|
||||||
return true, true, nil
|
|
||||||
},
|
|
||||||
expectTimeout: false,
|
|
||||||
}}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
tc := tc
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
fcs := &fakeCS{
|
fcs := &fakeCS{
|
||||||
t: t,
|
t: t,
|
||||||
h: 1,
|
h: 1,
|
||||||
@ -1343,38 +1306,68 @@ func TestStateChangedTimeout(t *testing.T) {
|
|||||||
|
|
||||||
events := NewEvents(context.Background(), fcs)
|
events := NewEvents(context.Background(), fcs)
|
||||||
|
|
||||||
// Track whether the callback was called
|
|
||||||
called := false
|
called := false
|
||||||
|
|
||||||
// Set up state change tracking that will timeout at the given height
|
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
err := events.StateChanged(
|
return false, true, nil
|
||||||
tc.checkFn,
|
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
||||||
func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
|
||||||
// Expect the callback to be called at the timeout height with nil data
|
|
||||||
called = true
|
called = true
|
||||||
require.Nil(t, data)
|
require.Nil(t, data)
|
||||||
require.Equal(t, timeoutHeight, newTs.Height())
|
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
|
||||||
require.Equal(t, timeoutHeight+abi.ChainEpoch(confidence), curH)
|
require.Equal(t, abi.ChainEpoch(23), curH)
|
||||||
return false, nil
|
return false, nil
|
||||||
}, func(_ context.Context, ts *types.TipSet) error {
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
t.Fatal("revert on timeout")
|
t.Fatal("revert on timeout")
|
||||||
return nil
|
return nil
|
||||||
}, confidence, timeoutHeight, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
||||||
return false, nil, nil
|
return false, nil, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Advance to timeout height
|
fcs.advance(0, 21, nil)
|
||||||
fcs.advance(0, int(timeoutHeight)+1, nil)
|
|
||||||
require.False(t, called)
|
require.False(t, called)
|
||||||
|
|
||||||
// Advance past timeout height
|
fcs.advance(0, 5, nil)
|
||||||
fcs.advance(0, 5, nil, tc.nilBlocks...)
|
require.True(t, called)
|
||||||
require.Equal(t, tc.expectTimeout, called)
|
|
||||||
called = false
|
called = false
|
||||||
})
|
|
||||||
|
// with check func reporting done
|
||||||
|
|
||||||
|
fcs = &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||||
|
callNumber: map[string]int{},
|
||||||
|
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||||
}
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||||
|
|
||||||
|
events = NewEvents(context.Background(), fcs)
|
||||||
|
|
||||||
|
err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return true, true, nil
|
||||||
|
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
||||||
|
called = true
|
||||||
|
require.Nil(t, data)
|
||||||
|
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
|
||||||
|
require.Equal(t, abi.ChainEpoch(23), curH)
|
||||||
|
return false, nil
|
||||||
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
|
t.Fatal("revert on timeout")
|
||||||
|
return nil
|
||||||
|
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
||||||
|
return false, nil, nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fcs.advance(0, 21, nil)
|
||||||
|
require.False(t, called)
|
||||||
|
|
||||||
|
fcs.advance(0, 5, nil)
|
||||||
|
require.False(t, called)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCalledMultiplePerEpoch(t *testing.T) {
|
func TestCalledMultiplePerEpoch(t *testing.T) {
|
||||||
|
2
go.mod
2
go.mod
@ -36,7 +36,7 @@ require (
|
|||||||
github.com/filecoin-project/go-data-transfer v1.7.6
|
github.com/filecoin-project/go-data-transfer v1.7.6
|
||||||
github.com/filecoin-project/go-fil-commcid v0.1.0
|
github.com/filecoin-project/go-fil-commcid v0.1.0
|
||||||
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
|
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
|
||||||
github.com/filecoin-project/go-fil-markets v1.9.0
|
github.com/filecoin-project/go-fil-markets v1.8.1
|
||||||
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
|
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
|
||||||
github.com/filecoin-project/go-multistore v0.0.3
|
github.com/filecoin-project/go-multistore v0.0.3
|
||||||
github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1
|
github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1
|
||||||
|
4
go.sum
4
go.sum
@ -291,8 +291,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+
|
|||||||
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
|
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
|
||||||
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
|
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
|
||||||
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
|
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
|
||||||
github.com/filecoin-project/go-fil-markets v1.9.0 h1:atoORQmjN1SEjB4RKj3uvPCqL9Jcs2RZ1GHKefstkxw=
|
github.com/filecoin-project/go-fil-markets v1.8.1 h1:nNJB5EIp5c6yo/z51DloVaL7T24SslCoxSDOXwNQr9k=
|
||||||
github.com/filecoin-project/go-fil-markets v1.9.0/go.mod h1:PIPyOhoDLWT5NcciJQeK6Hes7MIeczGLNWVO/2Vy0a4=
|
github.com/filecoin-project/go-fil-markets v1.8.1/go.mod h1:PIPyOhoDLWT5NcciJQeK6Hes7MIeczGLNWVO/2Vy0a4=
|
||||||
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
||||||
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
|
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
|
||||||
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
|
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
|
||||||
|
@ -1,140 +0,0 @@
|
|||||||
package itests
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
||||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
|
||||||
market3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
|
|
||||||
market4 "github.com/filecoin-project/specs-actors/v4/actors/builtin/market"
|
|
||||||
market5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/market"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/itests/kit"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Test that the deal state eventually moves to "Expired" on both client and miner
|
|
||||||
func TestDealExpiry(t *testing.T) {
|
|
||||||
kit.QuietMiningLogs()
|
|
||||||
|
|
||||||
resetMinDealDuration(t)
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
var (
|
|
||||||
client kit.TestFullNode
|
|
||||||
miner1 kit.TestMiner
|
|
||||||
)
|
|
||||||
|
|
||||||
ens := kit.NewEnsemble(t, kit.MockProofs())
|
|
||||||
ens.FullNode(&client)
|
|
||||||
ens.Miner(&miner1, &client, kit.WithAllSubsystems())
|
|
||||||
bm := ens.Start().InterconnectAll().BeginMining(50 * time.Millisecond)
|
|
||||||
|
|
||||||
dh := kit.NewDealHarness(t, &client, &miner1, &miner1)
|
|
||||||
|
|
||||||
client.WaitTillChain(ctx, kit.HeightAtLeast(5))
|
|
||||||
|
|
||||||
// Make a deal with a short duration
|
|
||||||
dealProposalCid, _, _ := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{
|
|
||||||
Rseed: 0,
|
|
||||||
FastRet: true,
|
|
||||||
// Needs to be far enough in the future to ensure the deal has been sealed
|
|
||||||
StartEpoch: 3000,
|
|
||||||
// Short deal duration
|
|
||||||
MinBlocksDuration: 50,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Inject null blocks each time the chain advances by a block so as to
|
|
||||||
// get to deal expiration faster
|
|
||||||
go func() {
|
|
||||||
ch, _ := client.ChainNotify(ctx)
|
|
||||||
for range ch {
|
|
||||||
bm[0].InjectNulls(10)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
clientExpired := false
|
|
||||||
minerExpired := false
|
|
||||||
for {
|
|
||||||
ts, err := client.ChainHead(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Logf("Chain height: %d", ts.Height())
|
|
||||||
|
|
||||||
// Get the miner deal from the proposal CID
|
|
||||||
minerDeal := getMinerDeal(ctx, t, miner1, *dealProposalCid)
|
|
||||||
|
|
||||||
t.Logf("Miner deal:")
|
|
||||||
t.Logf(" %s -> %s", minerDeal.Proposal.Client, minerDeal.Proposal.Provider)
|
|
||||||
t.Logf(" StartEpoch: %d", minerDeal.Proposal.StartEpoch)
|
|
||||||
t.Logf(" EndEpoch: %d", minerDeal.Proposal.EndEpoch)
|
|
||||||
t.Logf(" State: %s", storagemarket.DealStates[minerDeal.State])
|
|
||||||
//spew.Dump(d)
|
|
||||||
|
|
||||||
// Get the client deal
|
|
||||||
clientDeals, err := client.ClientListDeals(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Logf("Client deal state: %s\n", storagemarket.DealStates[clientDeals[0].State])
|
|
||||||
|
|
||||||
// Expect the deal to eventually expire on the client and the miner
|
|
||||||
if clientDeals[0].State == storagemarket.StorageDealExpired {
|
|
||||||
t.Logf("Client deal expired")
|
|
||||||
clientExpired = true
|
|
||||||
}
|
|
||||||
if minerDeal.State == storagemarket.StorageDealExpired {
|
|
||||||
t.Logf("Miner deal expired")
|
|
||||||
minerExpired = true
|
|
||||||
}
|
|
||||||
if clientExpired && minerExpired {
|
|
||||||
t.Logf("PASS: Client and miner deal expired")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if ts.Height() > 5000 {
|
|
||||||
t.Fatalf("Reached height %d without client and miner deals expiring", ts.Height())
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getMinerDeal(ctx context.Context, t *testing.T, miner1 kit.TestMiner, dealProposalCid cid.Cid) storagemarket.MinerDeal {
|
|
||||||
minerDeals, err := miner1.MarketListIncompleteDeals(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Greater(t, len(minerDeals), 0)
|
|
||||||
|
|
||||||
for _, d := range minerDeals {
|
|
||||||
if d.ProposalCid == dealProposalCid {
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.Fatalf("miner deal with proposal CID %s not found", dealProposalCid)
|
|
||||||
return storagemarket.MinerDeal{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset minimum deal duration to 0, so we can make very short-lived deals.
|
|
||||||
// NOTE: this will need updating with every new specs-actors version.
|
|
||||||
func resetMinDealDuration(t *testing.T) {
|
|
||||||
m2 := market2.DealMinDuration
|
|
||||||
m3 := market3.DealMinDuration
|
|
||||||
m4 := market4.DealMinDuration
|
|
||||||
m5 := market5.DealMinDuration
|
|
||||||
|
|
||||||
market2.DealMinDuration = 0
|
|
||||||
market3.DealMinDuration = 0
|
|
||||||
market4.DealMinDuration = 0
|
|
||||||
market5.DealMinDuration = 0
|
|
||||||
|
|
||||||
t.Cleanup(func() {
|
|
||||||
market2.DealMinDuration = m2
|
|
||||||
market3.DealMinDuration = m3
|
|
||||||
market4.DealMinDuration = m4
|
|
||||||
market5.DealMinDuration = m5
|
|
||||||
})
|
|
||||||
}
|
|
@ -1,166 +0,0 @@
|
|||||||
package itests
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
||||||
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
|
||||||
"github.com/filecoin-project/lotus/itests/kit"
|
|
||||||
"github.com/filecoin-project/lotus/node"
|
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Test that when a miner terminates a sector containing a deal, the deal state
|
|
||||||
// eventually moves to "Slashed" on both client and miner
|
|
||||||
func TestDealSlashing(t *testing.T) {
|
|
||||||
kit.QuietMiningLogs()
|
|
||||||
_ = logging.SetLogLevel("sectors", "debug")
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
var (
|
|
||||||
client kit.TestFullNode
|
|
||||||
miner1 kit.TestMiner
|
|
||||||
)
|
|
||||||
|
|
||||||
// Set up sealing config so that there is no batching of terminate actions
|
|
||||||
var sealingCfgFn dtypes.GetSealingConfigFunc = func() (sealiface.Config, error) {
|
|
||||||
return sealiface.Config{
|
|
||||||
MaxWaitDealsSectors: 2,
|
|
||||||
MaxSealingSectors: 0,
|
|
||||||
MaxSealingSectorsForDeals: 0,
|
|
||||||
WaitDealsDelay: time.Second,
|
|
||||||
AlwaysKeepUnsealedCopy: true,
|
|
||||||
|
|
||||||
BatchPreCommits: true,
|
|
||||||
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize,
|
|
||||||
PreCommitBatchWait: time.Second,
|
|
||||||
PreCommitBatchSlack: time.Second,
|
|
||||||
|
|
||||||
AggregateCommits: true,
|
|
||||||
MinCommitBatch: 1,
|
|
||||||
MaxCommitBatch: 1,
|
|
||||||
CommitBatchWait: time.Second,
|
|
||||||
CommitBatchSlack: time.Second,
|
|
||||||
|
|
||||||
AggregateAboveBaseFee: types.BigMul(types.PicoFil, types.NewInt(150)), // 0.15 nFIL
|
|
||||||
|
|
||||||
TerminateBatchMin: 1,
|
|
||||||
TerminateBatchMax: 1,
|
|
||||||
TerminateBatchWait: time.Second,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
fn := func() dtypes.GetSealingConfigFunc { return sealingCfgFn }
|
|
||||||
sealingCfg := kit.ConstructorOpts(node.Override(new(dtypes.GetSealingConfigFunc), fn))
|
|
||||||
|
|
||||||
// Set up a client and miner
|
|
||||||
ens := kit.NewEnsemble(t, kit.MockProofs())
|
|
||||||
ens.FullNode(&client)
|
|
||||||
ens.Miner(&miner1, &client, kit.WithAllSubsystems(), sealingCfg)
|
|
||||||
ens.Start().InterconnectAll().BeginMining(50 * time.Millisecond)
|
|
||||||
|
|
||||||
dh := kit.NewDealHarness(t, &client, &miner1, &miner1)
|
|
||||||
|
|
||||||
client.WaitTillChain(ctx, kit.HeightAtLeast(5))
|
|
||||||
|
|
||||||
// Make a storage deal
|
|
||||||
dealProposalCid, _, _ := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{
|
|
||||||
Rseed: 0,
|
|
||||||
FastRet: true,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Get the miner deal from the proposal CID
|
|
||||||
minerDeal := getDealByProposalCid(ctx, t, miner1, *dealProposalCid)
|
|
||||||
|
|
||||||
// Terminate the sector containing the deal
|
|
||||||
t.Logf("Terminating sector %d containing deal %s", minerDeal.SectorNumber, dealProposalCid)
|
|
||||||
err := miner1.SectorTerminate(ctx, minerDeal.SectorNumber)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
clientExpired := false
|
|
||||||
minerExpired := false
|
|
||||||
for {
|
|
||||||
ts, err := client.ChainHead(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Logf("Chain height: %d", ts.Height())
|
|
||||||
|
|
||||||
// Get the miner deal from the proposal CID
|
|
||||||
minerDeal := getDealByProposalCid(ctx, t, miner1, *dealProposalCid)
|
|
||||||
// Get the miner state from the piece CID
|
|
||||||
mktDeal := getMarketDeal(ctx, t, miner1, minerDeal.Proposal.PieceCID)
|
|
||||||
|
|
||||||
t.Logf("Miner deal:")
|
|
||||||
t.Logf(" %s -> %s", minerDeal.Proposal.Client, minerDeal.Proposal.Provider)
|
|
||||||
t.Logf(" StartEpoch: %d", minerDeal.Proposal.StartEpoch)
|
|
||||||
t.Logf(" EndEpoch: %d", minerDeal.Proposal.EndEpoch)
|
|
||||||
t.Logf(" SlashEpoch: %d", mktDeal.State.SlashEpoch)
|
|
||||||
t.Logf(" LastUpdatedEpoch: %d", mktDeal.State.LastUpdatedEpoch)
|
|
||||||
t.Logf(" State: %s", storagemarket.DealStates[minerDeal.State])
|
|
||||||
//spew.Dump(d)
|
|
||||||
|
|
||||||
// Get the client deal
|
|
||||||
clientDeals, err := client.ClientListDeals(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Logf("Client deal state: %s\n", storagemarket.DealStates[clientDeals[0].State])
|
|
||||||
|
|
||||||
// Expect the deal to eventually be slashed on the client and the miner
|
|
||||||
if clientDeals[0].State == storagemarket.StorageDealSlashed {
|
|
||||||
t.Logf("Client deal slashed")
|
|
||||||
clientExpired = true
|
|
||||||
}
|
|
||||||
if minerDeal.State == storagemarket.StorageDealSlashed {
|
|
||||||
t.Logf("Miner deal slashed")
|
|
||||||
minerExpired = true
|
|
||||||
}
|
|
||||||
if clientExpired && minerExpired {
|
|
||||||
t.Logf("PASS: Client and miner deal slashed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if ts.Height() > 4000 {
|
|
||||||
t.Fatalf("Reached height %d without client and miner deals being slashed", ts.Height())
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getMarketDeal(ctx context.Context, t *testing.T, miner1 kit.TestMiner, pieceCid cid.Cid) api.MarketDeal {
|
|
||||||
mktDeals, err := miner1.MarketListDeals(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Greater(t, len(mktDeals), 0)
|
|
||||||
|
|
||||||
for _, d := range mktDeals {
|
|
||||||
if d.Proposal.PieceCID == pieceCid {
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.Fatalf("miner deal with piece CID %s not found", pieceCid)
|
|
||||||
return api.MarketDeal{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getDealByProposalCid(ctx context.Context, t *testing.T, miner1 kit.TestMiner, dealProposalCid cid.Cid) storagemarket.MinerDeal {
|
|
||||||
minerDeals, err := miner1.MarketListIncompleteDeals(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Greater(t, len(minerDeals), 0)
|
|
||||||
|
|
||||||
for _, d := range minerDeals {
|
|
||||||
if d.ProposalCid == dealProposalCid {
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.Fatalf("miner deal with proposal CID %s not found", dealProposalCid)
|
|
||||||
return storagemarket.MinerDeal{}
|
|
||||||
}
|
|
@ -36,7 +36,6 @@ type MakeFullDealParams struct {
|
|||||||
Rseed int
|
Rseed int
|
||||||
FastRet bool
|
FastRet bool
|
||||||
StartEpoch abi.ChainEpoch
|
StartEpoch abi.ChainEpoch
|
||||||
MinBlocksDuration uint64
|
|
||||||
UseCARFileForStorageDeal bool
|
UseCARFileForStorageDeal bool
|
||||||
|
|
||||||
// SuspendUntilCryptoeconStable suspends deal-making, until cryptoecon
|
// SuspendUntilCryptoeconStable suspends deal-making, until cryptoecon
|
||||||
@ -98,9 +97,6 @@ func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, params MakeFullDealPa
|
|||||||
dp.Data.Root = res.Root
|
dp.Data.Root = res.Root
|
||||||
dp.DealStartEpoch = params.StartEpoch
|
dp.DealStartEpoch = params.StartEpoch
|
||||||
dp.FastRetrieval = params.FastRet
|
dp.FastRetrieval = params.FastRet
|
||||||
if params.MinBlocksDuration > 0 {
|
|
||||||
dp.MinBlocksDuration = params.MinBlocksDuration
|
|
||||||
}
|
|
||||||
deal = dh.StartDeal(ctx, dp)
|
deal = dh.StartDeal(ctx, dp)
|
||||||
|
|
||||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||||
|
@ -18,7 +18,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-state-types/big"
|
"github.com/filecoin-project/go-state-types/big"
|
||||||
"github.com/filecoin-project/go-state-types/crypto"
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
"github.com/filecoin-project/go-state-types/exitcode"
|
"github.com/filecoin-project/go-state-types/exitcode"
|
||||||
market0 "github.com/filecoin-project/specs-actors/actors/builtin/market"
|
|
||||||
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
|
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
|
||||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||||
|
|
||||||
@ -40,8 +40,8 @@ type ClientNodeAdapter struct {
|
|||||||
|
|
||||||
fundmgr *market.FundManager
|
fundmgr *market.FundManager
|
||||||
ev *events.Events
|
ev *events.Events
|
||||||
|
dsMatcher *dealStateMatcher
|
||||||
scMgr *SectorCommittedManager
|
scMgr *SectorCommittedManager
|
||||||
deMgr *DealExpiryManager
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientApi struct {
|
type clientApi struct {
|
||||||
@ -60,10 +60,9 @@ func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi ful
|
|||||||
|
|
||||||
fundmgr: fundmgr,
|
fundmgr: fundmgr,
|
||||||
ev: ev,
|
ev: ev,
|
||||||
|
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi))),
|
||||||
}
|
}
|
||||||
a.scMgr = NewSectorCommittedManager(ev, a, &apiWrapper{api: capi})
|
a.scMgr = NewSectorCommittedManager(ev, a, &apiWrapper{api: capi})
|
||||||
dsMatcher := newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi)))
|
|
||||||
a.deMgr = NewDealExpiryManager(ev, capi, capi, dsMatcher)
|
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -250,8 +249,94 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
|
|||||||
return c.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, marketactor.DealProposal(proposal), *publishCid, cb)
|
return c.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, marketactor.DealProposal(proposal), *publishCid, cb)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, publishCid cid.Cid, proposal market0.DealProposal, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
|
// TODO: Replace dealID parameter with DealProposal
|
||||||
return c.deMgr.OnDealExpiredOrSlashed(ctx, publishCid, proposal, onDealExpired, onDealSlashed)
|
func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
|
||||||
|
head, err := c.ChainHead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("client: failed to get chain head: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sd, err := c.StateMarketStorageDeal(ctx, dealID, head.Key())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called immediately to check if the deal has already expired or been slashed
|
||||||
|
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||||
|
if ts == nil {
|
||||||
|
// keep listening for events
|
||||||
|
return false, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the deal has already expired
|
||||||
|
if sd.Proposal.EndEpoch <= ts.Height() {
|
||||||
|
onDealExpired(nil)
|
||||||
|
return true, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there is no deal assume it's already been slashed
|
||||||
|
if sd.State.SectorStartEpoch < 0 {
|
||||||
|
onDealSlashed(ts.Height(), nil)
|
||||||
|
return true, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// No events have occurred yet, so return
|
||||||
|
// done: false, more: true (keep listening for events)
|
||||||
|
return false, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called when there was a match against the state change we're looking for
|
||||||
|
// and the chain has advanced to the confidence height
|
||||||
|
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
|
||||||
|
// Check if the deal has already expired
|
||||||
|
if ts2 == nil || sd.Proposal.EndEpoch <= ts2.Height() {
|
||||||
|
onDealExpired(nil)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timeout waiting for state change
|
||||||
|
if states == nil {
|
||||||
|
log.Error("timed out waiting for deal expiry")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
changedDeals, ok := states.(state.ChangedDeals)
|
||||||
|
if !ok {
|
||||||
|
panic("Expected state.ChangedDeals")
|
||||||
|
}
|
||||||
|
|
||||||
|
deal, ok := changedDeals[dealID]
|
||||||
|
if !ok {
|
||||||
|
// No change to deal
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deal was slashed
|
||||||
|
if deal.To == nil {
|
||||||
|
onDealSlashed(ts2.Height(), nil)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called when there was a chain reorg and the state change was reverted
|
||||||
|
revert := func(ctx context.Context, ts *types.TipSet) error {
|
||||||
|
// TODO: Is it ok to just ignore this?
|
||||||
|
log.Warn("deal state reverted; TODO: actually handle this!")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch for state changes to the deal
|
||||||
|
match := c.dsMatcher.matcher(ctx, dealID)
|
||||||
|
|
||||||
|
// Wait until after the end epoch for the deal and then timeout
|
||||||
|
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
|
||||||
|
if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
|
||||||
|
return xerrors.Errorf("failed to set up state changed handler: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientNodeAdapter) SignProposal(ctx context.Context, signer address.Address, proposal market2.DealProposal) (*market2.ClientDealProposal, error) {
|
func (c *ClientNodeAdapter) SignProposal(ctx context.Context, signer address.Address, proposal market2.DealProposal) (*market2.ClientDealProposal, error) {
|
||||||
|
@ -1,152 +0,0 @@
|
|||||||
package storageadapter
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
market0 "github.com/filecoin-project/specs-actors/actors/builtin/market"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
|
||||||
"github.com/filecoin-project/lotus/chain/events"
|
|
||||||
"github.com/filecoin-project/lotus/chain/events/state"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
|
||||||
)
|
|
||||||
|
|
||||||
type demEventsAPI interface {
|
|
||||||
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
|
|
||||||
StateChanged(check events.CheckFunc, scHnd events.StateChangeHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.StateMatchFunc) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type demChainAPI interface {
|
|
||||||
ChainHead(context.Context) (*types.TipSet, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type DealExpiryManagerAPI interface {
|
|
||||||
demEventsAPI
|
|
||||||
demChainAPI
|
|
||||||
GetCurrentDealInfo(ctx context.Context, tok sealing.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type dealExpiryManagerAdapter struct {
|
|
||||||
demEventsAPI
|
|
||||||
demChainAPI
|
|
||||||
*sealing.CurrentDealInfoManager
|
|
||||||
}
|
|
||||||
|
|
||||||
type DealExpiryManager struct {
|
|
||||||
demAPI DealExpiryManagerAPI
|
|
||||||
dsMatcher *dealStateMatcher
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDealExpiryManager(ev demEventsAPI, ch demChainAPI, tskAPI sealing.CurrentDealInfoTskAPI, dsMatcher *dealStateMatcher) *DealExpiryManager {
|
|
||||||
dim := &sealing.CurrentDealInfoManager{
|
|
||||||
CDAPI: &sealing.CurrentDealInfoAPIAdapter{CurrentDealInfoTskAPI: tskAPI},
|
|
||||||
}
|
|
||||||
|
|
||||||
adapter := &dealExpiryManagerAdapter{
|
|
||||||
demEventsAPI: ev,
|
|
||||||
demChainAPI: ch,
|
|
||||||
CurrentDealInfoManager: dim,
|
|
||||||
}
|
|
||||||
return newDealExpiryManager(adapter, dsMatcher)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newDealExpiryManager(demAPI DealExpiryManagerAPI, dsMatcher *dealStateMatcher) *DealExpiryManager {
|
|
||||||
return &DealExpiryManager{demAPI: demAPI, dsMatcher: dsMatcher}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mgr *DealExpiryManager) OnDealExpiredOrSlashed(ctx context.Context, publishCid cid.Cid, proposal market0.DealProposal, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
|
|
||||||
head, err := mgr.demAPI.ChainHead(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("client: failed to get chain head: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
prop := market.DealProposal(proposal)
|
|
||||||
res, err := mgr.demAPI.GetCurrentDealInfo(ctx, head.Key().Bytes(), &prop, publishCid)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("awaiting deal expired: getting deal info: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called immediately to check if the deal has already expired or been slashed
|
|
||||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
|
||||||
if ts == nil {
|
|
||||||
// keep listening for events
|
|
||||||
return false, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the deal has already expired
|
|
||||||
if res.MarketDeal.Proposal.EndEpoch <= ts.Height() {
|
|
||||||
onDealExpired(nil)
|
|
||||||
return true, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there is no deal assume it's already been slashed
|
|
||||||
if res.MarketDeal.State.SectorStartEpoch < 0 {
|
|
||||||
onDealSlashed(ts.Height(), nil)
|
|
||||||
return true, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// No events have occurred yet, so return
|
|
||||||
// done: false, more: true (keep listening for events)
|
|
||||||
return false, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called when there was a match against the state change we're looking for
|
|
||||||
// and the chain has advanced to the confidence height
|
|
||||||
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
|
|
||||||
// Check if the deal has already expired
|
|
||||||
if res.MarketDeal.Proposal.EndEpoch <= h {
|
|
||||||
onDealExpired(nil)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Timeout waiting for state change
|
|
||||||
if states == nil {
|
|
||||||
log.Errorf("timed out waiting for deal expiry for deal with piece CID %s", proposal.PieceCID)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
changedDeals, ok := states.(state.ChangedDeals)
|
|
||||||
if !ok {
|
|
||||||
return false, xerrors.Errorf("OnDealExpireOrSlashed stateChanged: Expected state.ChangedDeals")
|
|
||||||
}
|
|
||||||
|
|
||||||
deal, ok := changedDeals[res.DealID]
|
|
||||||
if !ok {
|
|
||||||
// No change to deal
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deal was slashed
|
|
||||||
if deal.To == nil || deal.To.SlashEpoch > 0 {
|
|
||||||
onDealSlashed(ts2.Height(), nil)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called when there was a chain reorg and the state change was reverted
|
|
||||||
revert := func(ctx context.Context, ts *types.TipSet) error {
|
|
||||||
// TODO: Is it ok to just ignore this?
|
|
||||||
log.Warn("deal state reverted; TODO: actually handle this!")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Watch for state changes to the deal
|
|
||||||
match := mgr.dsMatcher.matcher(ctx, res.DealID)
|
|
||||||
|
|
||||||
// Wait until after the end epoch for the deal and then timeout
|
|
||||||
timeout := res.MarketDeal.Proposal.EndEpoch + 1
|
|
||||||
if err := mgr.demAPI.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
|
|
||||||
return xerrors.Errorf("failed to set up state changed handler: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -18,8 +18,6 @@ import (
|
|||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/go-state-types/crypto"
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
"github.com/filecoin-project/go-state-types/exitcode"
|
"github.com/filecoin-project/go-state-types/exitcode"
|
||||||
"github.com/filecoin-project/lotus/chain/events/state"
|
|
||||||
market0 "github.com/filecoin-project/specs-actors/actors/builtin/market"
|
|
||||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
@ -28,6 +26,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/lotus/chain/events"
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
|
"github.com/filecoin-project/lotus/chain/events/state"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||||
"github.com/filecoin-project/lotus/lib/sigs"
|
"github.com/filecoin-project/lotus/lib/sigs"
|
||||||
@ -52,8 +51,8 @@ type ProviderNodeAdapter struct {
|
|||||||
|
|
||||||
addBalanceSpec *api.MessageSendSpec
|
addBalanceSpec *api.MessageSendSpec
|
||||||
maxDealCollateralMultiplier uint64
|
maxDealCollateralMultiplier uint64
|
||||||
|
dsMatcher *dealStateMatcher
|
||||||
scMgr *SectorCommittedManager
|
scMgr *SectorCommittedManager
|
||||||
deMgr *DealExpiryManager
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
|
func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
|
||||||
@ -67,6 +66,7 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConf
|
|||||||
secb: secb,
|
secb: secb,
|
||||||
ev: ev,
|
ev: ev,
|
||||||
dealPublisher: dealPublisher,
|
dealPublisher: dealPublisher,
|
||||||
|
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))),
|
||||||
}
|
}
|
||||||
if fc != nil {
|
if fc != nil {
|
||||||
na.addBalanceSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxMarketBalanceAddFee)}
|
na.addBalanceSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxMarketBalanceAddFee)}
|
||||||
@ -77,9 +77,6 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConf
|
|||||||
}
|
}
|
||||||
na.scMgr = NewSectorCommittedManager(ev, na, &apiWrapper{api: full})
|
na.scMgr = NewSectorCommittedManager(ev, na, &apiWrapper{api: full})
|
||||||
|
|
||||||
dsMatcher := newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full)))
|
|
||||||
na.deMgr = NewDealExpiryManager(ev, full, full, dsMatcher)
|
|
||||||
|
|
||||||
return na
|
return na
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -331,8 +328,93 @@ func (n *ProviderNodeAdapter) GetDataCap(ctx context.Context, addr address.Addre
|
|||||||
return sp, err
|
return sp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, publishCid cid.Cid, proposal market0.DealProposal, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
|
func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
|
||||||
return n.deMgr.OnDealExpiredOrSlashed(ctx, publishCid, proposal, onDealExpired, onDealSlashed)
|
head, err := n.ChainHead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("client: failed to get chain head: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sd, err := n.StateMarketStorageDeal(ctx, dealID, head.Key())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called immediately to check if the deal has already expired or been slashed
|
||||||
|
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||||
|
if ts == nil {
|
||||||
|
// keep listening for events
|
||||||
|
return false, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the deal has already expired
|
||||||
|
if sd.Proposal.EndEpoch <= ts.Height() {
|
||||||
|
onDealExpired(nil)
|
||||||
|
return true, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there is no deal assume it's already been slashed
|
||||||
|
if sd.State.SectorStartEpoch < 0 {
|
||||||
|
onDealSlashed(ts.Height(), nil)
|
||||||
|
return true, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// No events have occurred yet, so return
|
||||||
|
// done: false, more: true (keep listening for events)
|
||||||
|
return false, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called when there was a match against the state change we're looking for
|
||||||
|
// and the chain has advanced to the confidence height
|
||||||
|
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
|
||||||
|
// Check if the deal has already expired
|
||||||
|
if ts2 == nil || sd.Proposal.EndEpoch <= ts2.Height() {
|
||||||
|
onDealExpired(nil)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timeout waiting for state change
|
||||||
|
if states == nil {
|
||||||
|
log.Error("timed out waiting for deal expiry")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
changedDeals, ok := states.(state.ChangedDeals)
|
||||||
|
if !ok {
|
||||||
|
panic("Expected state.ChangedDeals")
|
||||||
|
}
|
||||||
|
|
||||||
|
deal, ok := changedDeals[dealID]
|
||||||
|
if !ok {
|
||||||
|
// No change to deal
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deal was slashed
|
||||||
|
if deal.To == nil {
|
||||||
|
onDealSlashed(ts2.Height(), nil)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called when there was a chain reorg and the state change was reverted
|
||||||
|
revert := func(ctx context.Context, ts *types.TipSet) error {
|
||||||
|
// TODO: Is it ok to just ignore this?
|
||||||
|
log.Warn("deal state reverted; TODO: actually handle this!")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch for state changes to the deal
|
||||||
|
match := n.dsMatcher.matcher(ctx, dealID)
|
||||||
|
|
||||||
|
// Wait until after the end epoch for the deal and then timeout
|
||||||
|
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
|
||||||
|
if err := n.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
|
||||||
|
return xerrors.Errorf("failed to set up state changed handler: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}
|
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}
|
||||||
|
@ -525,7 +525,7 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
|
|||||||
earliest := abi.ChainEpoch(sealEpochs) + ht
|
earliest := abi.ChainEpoch(sealEpochs) + ht
|
||||||
if deal.Proposal.StartEpoch < earliest {
|
if deal.Proposal.StartEpoch < earliest {
|
||||||
log.Warnw("proposed deal would start before sealing can be completed; rejecting storage deal proposal from client", "piece_cid", deal.Proposal.PieceCID, "client", deal.Client.String(), "seal_duration", sealDuration, "earliest", earliest, "curepoch", ht)
|
log.Warnw("proposed deal would start before sealing can be completed; rejecting storage deal proposal from client", "piece_cid", deal.Proposal.PieceCID, "client", deal.Client.String(), "seal_duration", sealDuration, "earliest", earliest, "curepoch", ht)
|
||||||
return false, fmt.Sprintf("proposed deal start epoch %s too early, cannot seal a sector before %s", deal.Proposal.StartEpoch, earliest), nil
|
return false, fmt.Sprintf("cannot seal a sector before %s", deal.Proposal.StartEpoch), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sd, err := startDelay()
|
sd, err := startDelay()
|
||||||
|
Loading…
Reference in New Issue
Block a user