From f79652c28c9acc67cda2fb09761971a1a1717389 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 28 Oct 2020 12:38:06 +0100 Subject: [PATCH] feat: cache deal states for most recent old/new tipset --- chain/events/state/mock/api.go | 69 ++++++++ chain/events/state/mock/state.go | 32 ++++ chain/events/state/mock/tipset.go | 27 +++ chain/events/state/predicates_test.go | 101 ++--------- markets/storageadapter/client.go | 23 ++- markets/storageadapter/dealstatematcher.go | 84 ++++++++++ .../storageadapter/dealstatematcher_test.go | 157 ++++++++++++++++++ markets/storageadapter/provider.go | 16 +- 8 files changed, 402 insertions(+), 107 deletions(-) create mode 100644 chain/events/state/mock/api.go create mode 100644 chain/events/state/mock/state.go create mode 100644 chain/events/state/mock/tipset.go create mode 100644 markets/storageadapter/dealstatematcher.go create mode 100644 markets/storageadapter/dealstatematcher_test.go diff --git a/chain/events/state/mock/api.go b/chain/events/state/mock/api.go new file mode 100644 index 000000000..4e8bcc94d --- /dev/null +++ b/chain/events/state/mock/api.go @@ -0,0 +1,69 @@ +package test + +import ( + "context" + "sync" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/blockstore" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +type MockAPI struct { + bs blockstore.Blockstore + + lk sync.Mutex + ts map[types.TipSetKey]*types.Actor + stateGetActorCalled int +} + +func NewMockAPI(bs blockstore.Blockstore) *MockAPI { + return &MockAPI{ + bs: bs, + ts: make(map[types.TipSetKey]*types.Actor), + } +} + +func (m *MockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { + return m.bs.Has(c) +} + +func (m *MockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { + blk, err := m.bs.Get(c) + if err != nil { + return nil, xerrors.Errorf("blockstore get: %w", err) + } + + return blk.RawData(), nil +} + +func (m *MockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { + m.lk.Lock() + defer m.lk.Unlock() + + m.stateGetActorCalled++ + return m.ts[tsk], nil +} + +func (m *MockAPI) StateGetActorCallCount() int { + m.lk.Lock() + defer m.lk.Unlock() + + return m.stateGetActorCalled +} + +func (m *MockAPI) ResetCallCounts() { + m.lk.Lock() + defer m.lk.Unlock() + + m.stateGetActorCalled = 0 +} + +func (m *MockAPI) SetActor(tsk types.TipSetKey, act *types.Actor) { + m.lk.Lock() + defer m.lk.Unlock() + + m.ts[tsk] = act +} diff --git a/chain/events/state/mock/state.go b/chain/events/state/mock/state.go new file mode 100644 index 000000000..bac06b59f --- /dev/null +++ b/chain/events/state/mock/state.go @@ -0,0 +1,32 @@ +package test + +import ( + "context" + "testing" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + "github.com/filecoin-project/specs-actors/v2/actors/util/adt" + "github.com/stretchr/testify/require" +) + +func CreateEmptyMarketState(t *testing.T, store adt.Store) *market.State { + emptyArrayCid, err := adt.MakeEmptyArray(store).Root() + require.NoError(t, err) + emptyMap, err := adt.MakeEmptyMap(store).Root() + require.NoError(t, err) + return market.ConstructState(emptyArrayCid, emptyMap, emptyMap) +} + +func CreateDealAMT(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState) cid.Cid { + root := adt.MakeEmptyArray(store) + for dealID, dealState := range deals { + err := root.Set(uint64(dealID), dealState) + require.NoError(t, err) + } + rootCid, err := root.Root() + require.NoError(t, err) + return rootCid +} diff --git a/chain/events/state/mock/tipset.go b/chain/events/state/mock/tipset.go new file mode 100644 index 000000000..39d42d6e5 --- /dev/null +++ b/chain/events/state/mock/tipset.go @@ -0,0 +1,27 @@ +package test + +import ( + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" +) + +var dummyCid cid.Cid + +func init() { + dummyCid, _ = cid.Parse("bafkqaaa") +} + +func MockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) { + return types.NewTipSet([]*types.BlockHeader{{ + Miner: minerAddr, + Height: 5, + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + Timestamp: timestamp, + }}) +} diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index 9b393f6e4..8fc93d9cd 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -4,21 +4,19 @@ import ( "context" "testing" + test "github.com/filecoin-project/lotus/chain/events/state/mock" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/go-bitfield" - "github.com/stretchr/testify/require" - "golang.org/x/xerrors" - "github.com/ipfs/go-cid" cbornode "github.com/ipfs/go-ipld-cbor" + "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/go-state-types/crypto" - builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" @@ -36,39 +34,6 @@ func init() { dummyCid, _ = cid.Parse("bafkqaaa") } -type mockAPI struct { - ts map[types.TipSetKey]*types.Actor - bs bstore.Blockstore -} - -func newMockAPI(bs bstore.Blockstore) *mockAPI { - return &mockAPI{ - bs: bs, - ts: make(map[types.TipSetKey]*types.Actor), - } -} - -func (m mockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { - return m.bs.Has(c) -} - -func (m mockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { - blk, err := m.bs.Get(c) - if err != nil { - return nil, xerrors.Errorf("blockstore get: %w", err) - } - - return blk.RawData(), nil -} - -func (m mockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { - return m.ts[tsk], nil -} - -func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) { - m.ts[tsk] = act -} - func TestMarketPredicates(t *testing.T) { ctx := context.Background() bs := bstore.NewTemporarySync() @@ -177,14 +142,14 @@ func TestMarketPredicates(t *testing.T) { minerAddr, err := address.NewFromString("t00") require.NoError(t, err) - oldState, err := mockTipset(minerAddr, 1) + oldState, err := test.MockTipset(minerAddr, 1) require.NoError(t, err) - newState, err := mockTipset(minerAddr, 2) + newState, err := test.MockTipset(minerAddr, 2) require.NoError(t, err) - api := newMockAPI(bs) - api.setActor(oldState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: oldStateC}) - api.setActor(newState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: newStateC}) + api := test.NewMockAPI(bs) + api.SetActor(oldState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: oldStateC}) + api.SetActor(newState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: newStateC}) t.Run("deal ID predicate", func(t *testing.T) { preds := NewStatePredicates(api) @@ -239,7 +204,7 @@ func TestMarketPredicates(t *testing.T) { t.Fatal("No state change so this should not be called") return false, nil, nil }) - marketState0 := createEmptyMarketState(t, store) + marketState0 := test.CreateEmptyMarketState(t, store) marketCid, err := store.Put(ctx, marketState0) require.NoError(t, err) marketState, err := market.Load(store, &types.Actor{ @@ -352,7 +317,7 @@ func TestMarketPredicates(t *testing.T) { t.Fatal("No state change so this should not be called") return false, nil, nil }) - marketState0 := createEmptyMarketState(t, store) + marketState0 := test.CreateEmptyMarketState(t, store) marketCid, err := store.Put(ctx, marketState0) require.NoError(t, err) marketState, err := market.Load(store, &types.Actor{ @@ -394,14 +359,14 @@ func TestMinerSectorChange(t *testing.T) { newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3}) minerAddr := nextIDAddrF() - oldState, err := mockTipset(minerAddr, 1) + oldState, err := test.MockTipset(minerAddr, 1) require.NoError(t, err) - newState, err := mockTipset(minerAddr, 2) + newState, err := test.MockTipset(minerAddr, 2) require.NoError(t, err) - api := newMockAPI(bs) - api.setActor(oldState.Key(), &types.Actor{Head: oldMinerC, Code: builtin2.StorageMinerActorCodeID}) - api.setActor(newState.Key(), &types.Actor{Head: newMinerC, Code: builtin2.StorageMinerActorCodeID}) + api := test.NewMockAPI(bs) + api.SetActor(oldState.Key(), &types.Actor{Head: oldMinerC, Code: builtin2.StorageMinerActorCodeID}) + api.SetActor(newState.Key(), &types.Actor{Head: newMinerC, Code: builtin2.StorageMinerActorCodeID}) preds := NewStatePredicates(api) @@ -449,29 +414,16 @@ func TestMinerSectorChange(t *testing.T) { require.Equal(t, si1Ext, sectorChanges.Extended[0].From) } -func mockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) { - return types.NewTipSet([]*types.BlockHeader{{ - Miner: minerAddr, - Height: 5, - ParentStateRoot: dummyCid, - Messages: dummyCid, - ParentMessageReceipts: dummyCid, - BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, - BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, - Timestamp: timestamp, - }}) -} - type balance struct { available abi.TokenAmount locked abi.TokenAmount } func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState, props map[abi.DealID]*market2.DealProposal, balances map[address.Address]balance) cid.Cid { - dealRootCid := createDealAMT(ctx, t, store, deals) + dealRootCid := test.CreateDealAMT(ctx, t, store, deals) propRootCid := createProposalAMT(ctx, t, store, props) balancesCids := createBalanceTable(ctx, t, store, balances) - state := createEmptyMarketState(t, store) + state := test.CreateEmptyMarketState(t, store) state.States = dealRootCid state.Proposals = propRootCid state.EscrowTable = balancesCids[0] @@ -482,25 +434,6 @@ func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deal return stateC } -func createEmptyMarketState(t *testing.T, store adt2.Store) *market2.State { - emptyArrayCid, err := adt2.MakeEmptyArray(store).Root() - require.NoError(t, err) - emptyMap, err := adt2.MakeEmptyMap(store).Root() - require.NoError(t, err) - return market2.ConstructState(emptyArrayCid, emptyMap, emptyMap) -} - -func createDealAMT(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState) cid.Cid { - root := adt2.MakeEmptyArray(store) - for dealID, dealState := range deals { - err := root.Set(uint64(dealID), dealState) - require.NoError(t, err) - } - rootCid, err := root.Root() - require.NoError(t, err) - return rootCid -} - func createProposalAMT(ctx context.Context, t *testing.T, store adt2.Store, props map[abi.DealID]*market2.DealProposal) cid.Cid { root := adt2.MakeEmptyArray(store) for dealID, prop := range props { diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index f299dd4d5..e8e6fbcae 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -38,8 +38,9 @@ type ClientNodeAdapter struct { full.ChainAPI full.MpoolAPI - fm *market.FundMgr - ev *events.Events + fm *market.FundMgr + ev *events.Events + dsMatcher *dealStateMatcher } type clientApi struct { @@ -47,14 +48,16 @@ type clientApi struct { full.StateAPI } -func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode { +func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode { + capi := &clientApi{chain, stateapi} return &ClientNodeAdapter{ - StateAPI: state, + StateAPI: stateapi, ChainAPI: chain, MpoolAPI: mpool, - fm: fm, - ev: events.NewEvents(context.TODO(), &clientApi{chain, state}), + fm: fm, + ev: events.NewEvents(context.TODO(), capi), + dsMatcher: newDealStateMatcher(state.NewStatePredicates(capi)), } } @@ -389,13 +392,7 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a } // Watch for state changes to the deal - preds := state.NewStatePredicates(c) - dealDiff := preds.OnStorageMarketActorChanged( - preds.OnDealStateChanged( - preds.DealStateChangedForIDs([]abi.DealID{dealID}))) - match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { - return dealDiff(ctx, oldTs.Key(), newTs.Key()) - } + match := c.dsMatcher.matcher(ctx, dealID) // Wait until after the end epoch for the deal and then timeout timeout := (sd.Proposal.EndEpoch - head.Height()) + 1 diff --git a/markets/storageadapter/dealstatematcher.go b/markets/storageadapter/dealstatematcher.go new file mode 100644 index 000000000..b8b47ef8e --- /dev/null +++ b/markets/storageadapter/dealstatematcher.go @@ -0,0 +1,84 @@ +package storageadapter + +import ( + "context" + "sync" + + "github.com/filecoin-project/go-state-types/abi" + actorsmarket "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/events/state" + "github.com/filecoin-project/lotus/chain/types" +) + +// dealStateMatcher caches the DealStates for the most recent +// old/new tipset combination +type dealStateMatcher struct { + preds *state.StatePredicates + + lk sync.Mutex + oldTsk types.TipSetKey + newTsk types.TipSetKey + oldDealStateRoot actorsmarket.DealStates + newDealStateRoot actorsmarket.DealStates +} + +func newDealStateMatcher(preds *state.StatePredicates) *dealStateMatcher { + return &dealStateMatcher{preds: preds} +} + +// matcher returns a function that checks if the state of the given dealID +// has changed. +// It caches the DealStates for the most recent old/new tipset combination. +func (mc *dealStateMatcher) matcher(ctx context.Context, dealID abi.DealID) events.StateMatchFunc { + // The function that is called to check if the deal state has changed for + // the target deal ID + dealStateChangedForID := mc.preds.DealStateChangedForIDs([]abi.DealID{dealID}) + + // The match function is called by the events API to check if there's + // been a state change for the deal with the target deal ID + match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { + mc.lk.Lock() + defer mc.lk.Unlock() + + // Check if we've already fetched the DealStates for the given tipsets + if mc.oldTsk == oldTs.Key() && mc.newTsk == newTs.Key() { + // If we fetch the DealStates and there is no difference between + // them, they are stored as nil. So we can just bail out. + if mc.oldDealStateRoot == nil || mc.newDealStateRoot == nil { + return false, nil, nil + } + + // Check if the deal state has changed for the target ID + return dealStateChangedForID(ctx, mc.oldDealStateRoot, mc.newDealStateRoot) + } + + // We haven't already fetched the DealStates for the given tipsets, so + // do so now + + // Replace dealStateChangedForID with a function that records the + // DealStates so that we can cache them + var oldDealStateRootSaved, newDealStateRootSaved actorsmarket.DealStates + recorder := func(ctx context.Context, oldDealStateRoot, newDealStateRoot actorsmarket.DealStates) (changed bool, user state.UserData, err error) { + // Record DealStates + oldDealStateRootSaved = oldDealStateRoot + newDealStateRootSaved = newDealStateRoot + + return dealStateChangedForID(ctx, oldDealStateRoot, newDealStateRoot) + } + + // Call the match function + dealDiff := mc.preds.OnStorageMarketActorChanged( + mc.preds.OnDealStateChanged(recorder)) + matched, data, err := dealDiff(ctx, oldTs.Key(), newTs.Key()) + + // Save the recorded DealStates for the tipsets + mc.oldTsk = oldTs.Key() + mc.newTsk = newTs.Key() + mc.oldDealStateRoot = oldDealStateRootSaved + mc.newDealStateRoot = newDealStateRootSaved + + return matched, data, err + } + return match +} diff --git a/markets/storageadapter/dealstatematcher_test.go b/markets/storageadapter/dealstatematcher_test.go new file mode 100644 index 000000000..d0c5277d5 --- /dev/null +++ b/markets/storageadapter/dealstatematcher_test.go @@ -0,0 +1,157 @@ +package storageadapter + +import ( + "context" + "testing" + + "github.com/filecoin-project/lotus/chain/events" + "golang.org/x/sync/errgroup" + + cbornode "github.com/ipfs/go-ipld-cbor" + + adt2 "github.com/filecoin-project/specs-actors/v2/actors/util/adt" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + test "github.com/filecoin-project/lotus/chain/events/state/mock" + bstore "github.com/filecoin-project/lotus/lib/blockstore" + builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" + + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/chain/events/state" + "github.com/filecoin-project/lotus/chain/types" +) + +func TestDealStateMatcher(t *testing.T) { + ctx := context.Background() + bs := bstore.NewTemporarySync() + store := adt2.WrapStore(ctx, cbornode.NewCborStore(bs)) + + deal1 := &market2.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + } + deal2 := &market2.DealState{ + SectorStartEpoch: 4, + LastUpdatedEpoch: 5, + } + deal3 := &market2.DealState{ + SectorStartEpoch: 7, + LastUpdatedEpoch: 8, + } + deals1 := map[abi.DealID]*market2.DealState{ + abi.DealID(1): deal1, + } + deals2 := map[abi.DealID]*market2.DealState{ + abi.DealID(1): deal2, + } + deals3 := map[abi.DealID]*market2.DealState{ + abi.DealID(1): deal3, + } + + deal1StateC := createMarketState(ctx, t, store, deals1) + deal2StateC := createMarketState(ctx, t, store, deals2) + deal3StateC := createMarketState(ctx, t, store, deals3) + + minerAddr, err := address.NewFromString("t00") + require.NoError(t, err) + ts1, err := test.MockTipset(minerAddr, 1) + require.NoError(t, err) + ts2, err := test.MockTipset(minerAddr, 2) + require.NoError(t, err) + ts3, err := test.MockTipset(minerAddr, 3) + require.NoError(t, err) + + api := test.NewMockAPI(bs) + api.SetActor(ts1.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: deal1StateC}) + api.SetActor(ts2.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: deal2StateC}) + api.SetActor(ts3.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: deal3StateC}) + + t.Run("caching", func(t *testing.T) { + dsm := newDealStateMatcher(state.NewStatePredicates(api)) + matcher := dsm.matcher(ctx, abi.DealID(1)) + + // Call matcher with tipsets that have the same state + ok, stateChange, err := matcher(ts1, ts1) + require.NoError(t, err) + require.False(t, ok) + require.Nil(t, stateChange) + // Should call StateGetActor once for each tipset + require.Equal(t, 2, api.StateGetActorCallCount()) + + // Call matcher with tipsets that have different state + api.ResetCallCounts() + ok, stateChange, err = matcher(ts1, ts2) + require.NoError(t, err) + require.True(t, ok) + require.NotNil(t, stateChange) + // Should call StateGetActor once for each tipset + require.Equal(t, 2, api.StateGetActorCallCount()) + + // Call matcher again with the same tipsets as above, should be cached + api.ResetCallCounts() + ok, stateChange, err = matcher(ts1, ts2) + require.NoError(t, err) + require.True(t, ok) + require.NotNil(t, stateChange) + // Should not call StateGetActor (because it should hit the cache) + require.Equal(t, 0, api.StateGetActorCallCount()) + + // Call matcher with different tipsets, should not be cached + api.ResetCallCounts() + ok, stateChange, err = matcher(ts2, ts3) + require.NoError(t, err) + require.True(t, ok) + require.NotNil(t, stateChange) + // Should call StateGetActor once for each tipset + require.Equal(t, 2, api.StateGetActorCallCount()) + }) + + t.Run("parallel", func(t *testing.T) { + api.ResetCallCounts() + dsm := newDealStateMatcher(state.NewStatePredicates(api)) + matcher := dsm.matcher(ctx, abi.DealID(1)) + + // Call matcher with lots of go-routines in parallel + var eg errgroup.Group + res := make([]struct { + ok bool + stateChange events.StateChange + }, 20) + for i := 0; i < len(res); i++ { + i := i + eg.Go(func() error { + ok, stateChange, err := matcher(ts1, ts2) + res[i].ok = ok + res[i].stateChange = stateChange + return err + }) + } + err := eg.Wait() + require.NoError(t, err) + + // All go-routines should have got the same (cached) result + for i := 1; i < len(res); i++ { + require.Equal(t, res[i].ok, res[i-1].ok) + require.Equal(t, res[i].stateChange, res[i-1].stateChange) + } + + // Only one go-routine should have called StateGetActor + // (once for each tipset) + require.Equal(t, 2, api.StateGetActorCallCount()) + }) +} + +func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState) cid.Cid { + dealRootCid := test.CreateDealAMT(ctx, t, store, deals) + state := test.CreateEmptyMarketState(t, store) + state.States = dealRootCid + + stateC, err := store.Put(ctx, state) + require.NoError(t, err) + return stateC +} diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index ce7c8e917..8debbd198 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -51,6 +51,7 @@ type ProviderNodeAdapter struct { ev *events.Events publishSpec, addBalanceSpec *api.MessageSendSpec + dsMatcher *dealStateMatcher } func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { @@ -58,9 +59,10 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDA na := &ProviderNodeAdapter{ FullNode: full, - dag: dag, - secb: secb, - ev: events.NewEvents(context.TODO(), full), + dag: dag, + secb: secb, + ev: events.NewEvents(context.TODO(), full), + dsMatcher: newDealStateMatcher(state.NewStatePredicates(full)), } if fc != nil { na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)} @@ -461,13 +463,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID } // Watch for state changes to the deal - preds := state.NewStatePredicates(n) - dealDiff := preds.OnStorageMarketActorChanged( - preds.OnDealStateChanged( - preds.DealStateChangedForIDs([]abi.DealID{dealID}))) - match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { - return dealDiff(ctx, oldTs.Key(), newTs.Key()) - } + match := n.dsMatcher.matcher(ctx, dealID) // Wait until after the end epoch for the deal and then timeout timeout := (sd.Proposal.EndEpoch - head.Height()) + 1