feat: cache deal states for most recent old/new tipset
This commit is contained in:
parent
3d02dba5dc
commit
f79652c28c
69
chain/events/state/mock/api.go
Normal file
69
chain/events/state/mock/api.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MockAPI struct {
|
||||||
|
bs blockstore.Blockstore
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
|
ts map[types.TipSetKey]*types.Actor
|
||||||
|
stateGetActorCalled int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMockAPI(bs blockstore.Blockstore) *MockAPI {
|
||||||
|
return &MockAPI{
|
||||||
|
bs: bs,
|
||||||
|
ts: make(map[types.TipSetKey]*types.Actor),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) {
|
||||||
|
return m.bs.Has(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) {
|
||||||
|
blk, err := m.bs.Get(c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("blockstore get: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return blk.RawData(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
|
||||||
|
m.lk.Lock()
|
||||||
|
defer m.lk.Unlock()
|
||||||
|
|
||||||
|
m.stateGetActorCalled++
|
||||||
|
return m.ts[tsk], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAPI) StateGetActorCallCount() int {
|
||||||
|
m.lk.Lock()
|
||||||
|
defer m.lk.Unlock()
|
||||||
|
|
||||||
|
return m.stateGetActorCalled
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAPI) ResetCallCounts() {
|
||||||
|
m.lk.Lock()
|
||||||
|
defer m.lk.Unlock()
|
||||||
|
|
||||||
|
m.stateGetActorCalled = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAPI) SetActor(tsk types.TipSetKey, act *types.Actor) {
|
||||||
|
m.lk.Lock()
|
||||||
|
defer m.lk.Unlock()
|
||||||
|
|
||||||
|
m.ts[tsk] = act
|
||||||
|
}
|
32
chain/events/state/mock/state.go
Normal file
32
chain/events/state/mock/state.go
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||||
|
"github.com/filecoin-project/specs-actors/v2/actors/util/adt"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func CreateEmptyMarketState(t *testing.T, store adt.Store) *market.State {
|
||||||
|
emptyArrayCid, err := adt.MakeEmptyArray(store).Root()
|
||||||
|
require.NoError(t, err)
|
||||||
|
emptyMap, err := adt.MakeEmptyMap(store).Root()
|
||||||
|
require.NoError(t, err)
|
||||||
|
return market.ConstructState(emptyArrayCid, emptyMap, emptyMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateDealAMT(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState) cid.Cid {
|
||||||
|
root := adt.MakeEmptyArray(store)
|
||||||
|
for dealID, dealState := range deals {
|
||||||
|
err := root.Set(uint64(dealID), dealState)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
rootCid, err := root.Root()
|
||||||
|
require.NoError(t, err)
|
||||||
|
return rootCid
|
||||||
|
}
|
27
chain/events/state/mock/tipset.go
Normal file
27
chain/events/state/mock/tipset.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
)
|
||||||
|
|
||||||
|
var dummyCid cid.Cid
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
dummyCid, _ = cid.Parse("bafkqaaa")
|
||||||
|
}
|
||||||
|
|
||||||
|
func MockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) {
|
||||||
|
return types.NewTipSet([]*types.BlockHeader{{
|
||||||
|
Miner: minerAddr,
|
||||||
|
Height: 5,
|
||||||
|
ParentStateRoot: dummyCid,
|
||||||
|
Messages: dummyCid,
|
||||||
|
ParentMessageReceipts: dummyCid,
|
||||||
|
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
|
||||||
|
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
|
||||||
|
Timestamp: timestamp,
|
||||||
|
}})
|
||||||
|
}
|
@ -4,21 +4,19 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
test "github.com/filecoin-project/lotus/chain/events/state/mock"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-bitfield"
|
"github.com/filecoin-project/go-bitfield"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbornode "github.com/ipfs/go-ipld-cbor"
|
cbornode "github.com/ipfs/go-ipld-cbor"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/go-state-types/big"
|
"github.com/filecoin-project/go-state-types/big"
|
||||||
"github.com/filecoin-project/go-state-types/crypto"
|
|
||||||
|
|
||||||
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
|
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
|
||||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||||
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
|
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
|
||||||
@ -36,39 +34,6 @@ func init() {
|
|||||||
dummyCid, _ = cid.Parse("bafkqaaa")
|
dummyCid, _ = cid.Parse("bafkqaaa")
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockAPI struct {
|
|
||||||
ts map[types.TipSetKey]*types.Actor
|
|
||||||
bs bstore.Blockstore
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMockAPI(bs bstore.Blockstore) *mockAPI {
|
|
||||||
return &mockAPI{
|
|
||||||
bs: bs,
|
|
||||||
ts: make(map[types.TipSetKey]*types.Actor),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m mockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) {
|
|
||||||
return m.bs.Has(c)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m mockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) {
|
|
||||||
blk, err := m.bs.Get(c)
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("blockstore get: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return blk.RawData(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m mockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
|
|
||||||
return m.ts[tsk], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) {
|
|
||||||
m.ts[tsk] = act
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMarketPredicates(t *testing.T) {
|
func TestMarketPredicates(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
bs := bstore.NewTemporarySync()
|
bs := bstore.NewTemporarySync()
|
||||||
@ -177,14 +142,14 @@ func TestMarketPredicates(t *testing.T) {
|
|||||||
|
|
||||||
minerAddr, err := address.NewFromString("t00")
|
minerAddr, err := address.NewFromString("t00")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
oldState, err := mockTipset(minerAddr, 1)
|
oldState, err := test.MockTipset(minerAddr, 1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
newState, err := mockTipset(minerAddr, 2)
|
newState, err := test.MockTipset(minerAddr, 2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
api := newMockAPI(bs)
|
api := test.NewMockAPI(bs)
|
||||||
api.setActor(oldState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: oldStateC})
|
api.SetActor(oldState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: oldStateC})
|
||||||
api.setActor(newState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: newStateC})
|
api.SetActor(newState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: newStateC})
|
||||||
|
|
||||||
t.Run("deal ID predicate", func(t *testing.T) {
|
t.Run("deal ID predicate", func(t *testing.T) {
|
||||||
preds := NewStatePredicates(api)
|
preds := NewStatePredicates(api)
|
||||||
@ -239,7 +204,7 @@ func TestMarketPredicates(t *testing.T) {
|
|||||||
t.Fatal("No state change so this should not be called")
|
t.Fatal("No state change so this should not be called")
|
||||||
return false, nil, nil
|
return false, nil, nil
|
||||||
})
|
})
|
||||||
marketState0 := createEmptyMarketState(t, store)
|
marketState0 := test.CreateEmptyMarketState(t, store)
|
||||||
marketCid, err := store.Put(ctx, marketState0)
|
marketCid, err := store.Put(ctx, marketState0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
marketState, err := market.Load(store, &types.Actor{
|
marketState, err := market.Load(store, &types.Actor{
|
||||||
@ -352,7 +317,7 @@ func TestMarketPredicates(t *testing.T) {
|
|||||||
t.Fatal("No state change so this should not be called")
|
t.Fatal("No state change so this should not be called")
|
||||||
return false, nil, nil
|
return false, nil, nil
|
||||||
})
|
})
|
||||||
marketState0 := createEmptyMarketState(t, store)
|
marketState0 := test.CreateEmptyMarketState(t, store)
|
||||||
marketCid, err := store.Put(ctx, marketState0)
|
marketCid, err := store.Put(ctx, marketState0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
marketState, err := market.Load(store, &types.Actor{
|
marketState, err := market.Load(store, &types.Actor{
|
||||||
@ -394,14 +359,14 @@ func TestMinerSectorChange(t *testing.T) {
|
|||||||
newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3})
|
newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3})
|
||||||
|
|
||||||
minerAddr := nextIDAddrF()
|
minerAddr := nextIDAddrF()
|
||||||
oldState, err := mockTipset(minerAddr, 1)
|
oldState, err := test.MockTipset(minerAddr, 1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
newState, err := mockTipset(minerAddr, 2)
|
newState, err := test.MockTipset(minerAddr, 2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
api := newMockAPI(bs)
|
api := test.NewMockAPI(bs)
|
||||||
api.setActor(oldState.Key(), &types.Actor{Head: oldMinerC, Code: builtin2.StorageMinerActorCodeID})
|
api.SetActor(oldState.Key(), &types.Actor{Head: oldMinerC, Code: builtin2.StorageMinerActorCodeID})
|
||||||
api.setActor(newState.Key(), &types.Actor{Head: newMinerC, Code: builtin2.StorageMinerActorCodeID})
|
api.SetActor(newState.Key(), &types.Actor{Head: newMinerC, Code: builtin2.StorageMinerActorCodeID})
|
||||||
|
|
||||||
preds := NewStatePredicates(api)
|
preds := NewStatePredicates(api)
|
||||||
|
|
||||||
@ -449,29 +414,16 @@ func TestMinerSectorChange(t *testing.T) {
|
|||||||
require.Equal(t, si1Ext, sectorChanges.Extended[0].From)
|
require.Equal(t, si1Ext, sectorChanges.Extended[0].From)
|
||||||
}
|
}
|
||||||
|
|
||||||
func mockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) {
|
|
||||||
return types.NewTipSet([]*types.BlockHeader{{
|
|
||||||
Miner: minerAddr,
|
|
||||||
Height: 5,
|
|
||||||
ParentStateRoot: dummyCid,
|
|
||||||
Messages: dummyCid,
|
|
||||||
ParentMessageReceipts: dummyCid,
|
|
||||||
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
|
|
||||||
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
|
|
||||||
Timestamp: timestamp,
|
|
||||||
}})
|
|
||||||
}
|
|
||||||
|
|
||||||
type balance struct {
|
type balance struct {
|
||||||
available abi.TokenAmount
|
available abi.TokenAmount
|
||||||
locked abi.TokenAmount
|
locked abi.TokenAmount
|
||||||
}
|
}
|
||||||
|
|
||||||
func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState, props map[abi.DealID]*market2.DealProposal, balances map[address.Address]balance) cid.Cid {
|
func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState, props map[abi.DealID]*market2.DealProposal, balances map[address.Address]balance) cid.Cid {
|
||||||
dealRootCid := createDealAMT(ctx, t, store, deals)
|
dealRootCid := test.CreateDealAMT(ctx, t, store, deals)
|
||||||
propRootCid := createProposalAMT(ctx, t, store, props)
|
propRootCid := createProposalAMT(ctx, t, store, props)
|
||||||
balancesCids := createBalanceTable(ctx, t, store, balances)
|
balancesCids := createBalanceTable(ctx, t, store, balances)
|
||||||
state := createEmptyMarketState(t, store)
|
state := test.CreateEmptyMarketState(t, store)
|
||||||
state.States = dealRootCid
|
state.States = dealRootCid
|
||||||
state.Proposals = propRootCid
|
state.Proposals = propRootCid
|
||||||
state.EscrowTable = balancesCids[0]
|
state.EscrowTable = balancesCids[0]
|
||||||
@ -482,25 +434,6 @@ func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deal
|
|||||||
return stateC
|
return stateC
|
||||||
}
|
}
|
||||||
|
|
||||||
func createEmptyMarketState(t *testing.T, store adt2.Store) *market2.State {
|
|
||||||
emptyArrayCid, err := adt2.MakeEmptyArray(store).Root()
|
|
||||||
require.NoError(t, err)
|
|
||||||
emptyMap, err := adt2.MakeEmptyMap(store).Root()
|
|
||||||
require.NoError(t, err)
|
|
||||||
return market2.ConstructState(emptyArrayCid, emptyMap, emptyMap)
|
|
||||||
}
|
|
||||||
|
|
||||||
func createDealAMT(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState) cid.Cid {
|
|
||||||
root := adt2.MakeEmptyArray(store)
|
|
||||||
for dealID, dealState := range deals {
|
|
||||||
err := root.Set(uint64(dealID), dealState)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
rootCid, err := root.Root()
|
|
||||||
require.NoError(t, err)
|
|
||||||
return rootCid
|
|
||||||
}
|
|
||||||
|
|
||||||
func createProposalAMT(ctx context.Context, t *testing.T, store adt2.Store, props map[abi.DealID]*market2.DealProposal) cid.Cid {
|
func createProposalAMT(ctx context.Context, t *testing.T, store adt2.Store, props map[abi.DealID]*market2.DealProposal) cid.Cid {
|
||||||
root := adt2.MakeEmptyArray(store)
|
root := adt2.MakeEmptyArray(store)
|
||||||
for dealID, prop := range props {
|
for dealID, prop := range props {
|
||||||
|
@ -38,8 +38,9 @@ type ClientNodeAdapter struct {
|
|||||||
full.ChainAPI
|
full.ChainAPI
|
||||||
full.MpoolAPI
|
full.MpoolAPI
|
||||||
|
|
||||||
fm *market.FundMgr
|
fm *market.FundMgr
|
||||||
ev *events.Events
|
ev *events.Events
|
||||||
|
dsMatcher *dealStateMatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientApi struct {
|
type clientApi struct {
|
||||||
@ -47,14 +48,16 @@ type clientApi struct {
|
|||||||
full.StateAPI
|
full.StateAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode {
|
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode {
|
||||||
|
capi := &clientApi{chain, stateapi}
|
||||||
return &ClientNodeAdapter{
|
return &ClientNodeAdapter{
|
||||||
StateAPI: state,
|
StateAPI: stateapi,
|
||||||
ChainAPI: chain,
|
ChainAPI: chain,
|
||||||
MpoolAPI: mpool,
|
MpoolAPI: mpool,
|
||||||
|
|
||||||
fm: fm,
|
fm: fm,
|
||||||
ev: events.NewEvents(context.TODO(), &clientApi{chain, state}),
|
ev: events.NewEvents(context.TODO(), capi),
|
||||||
|
dsMatcher: newDealStateMatcher(state.NewStatePredicates(capi)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -389,13 +392,7 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Watch for state changes to the deal
|
// Watch for state changes to the deal
|
||||||
preds := state.NewStatePredicates(c)
|
match := c.dsMatcher.matcher(ctx, dealID)
|
||||||
dealDiff := preds.OnStorageMarketActorChanged(
|
|
||||||
preds.OnDealStateChanged(
|
|
||||||
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
|
|
||||||
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
|
|
||||||
return dealDiff(ctx, oldTs.Key(), newTs.Key())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait until after the end epoch for the deal and then timeout
|
// Wait until after the end epoch for the deal and then timeout
|
||||||
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
|
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
|
||||||
|
84
markets/storageadapter/dealstatematcher.go
Normal file
84
markets/storageadapter/dealstatematcher.go
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
package storageadapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
actorsmarket "github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
|
"github.com/filecoin-project/lotus/chain/events/state"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// dealStateMatcher caches the DealStates for the most recent
|
||||||
|
// old/new tipset combination
|
||||||
|
type dealStateMatcher struct {
|
||||||
|
preds *state.StatePredicates
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
|
oldTsk types.TipSetKey
|
||||||
|
newTsk types.TipSetKey
|
||||||
|
oldDealStateRoot actorsmarket.DealStates
|
||||||
|
newDealStateRoot actorsmarket.DealStates
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDealStateMatcher(preds *state.StatePredicates) *dealStateMatcher {
|
||||||
|
return &dealStateMatcher{preds: preds}
|
||||||
|
}
|
||||||
|
|
||||||
|
// matcher returns a function that checks if the state of the given dealID
|
||||||
|
// has changed.
|
||||||
|
// It caches the DealStates for the most recent old/new tipset combination.
|
||||||
|
func (mc *dealStateMatcher) matcher(ctx context.Context, dealID abi.DealID) events.StateMatchFunc {
|
||||||
|
// The function that is called to check if the deal state has changed for
|
||||||
|
// the target deal ID
|
||||||
|
dealStateChangedForID := mc.preds.DealStateChangedForIDs([]abi.DealID{dealID})
|
||||||
|
|
||||||
|
// The match function is called by the events API to check if there's
|
||||||
|
// been a state change for the deal with the target deal ID
|
||||||
|
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
|
||||||
|
mc.lk.Lock()
|
||||||
|
defer mc.lk.Unlock()
|
||||||
|
|
||||||
|
// Check if we've already fetched the DealStates for the given tipsets
|
||||||
|
if mc.oldTsk == oldTs.Key() && mc.newTsk == newTs.Key() {
|
||||||
|
// If we fetch the DealStates and there is no difference between
|
||||||
|
// them, they are stored as nil. So we can just bail out.
|
||||||
|
if mc.oldDealStateRoot == nil || mc.newDealStateRoot == nil {
|
||||||
|
return false, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the deal state has changed for the target ID
|
||||||
|
return dealStateChangedForID(ctx, mc.oldDealStateRoot, mc.newDealStateRoot)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We haven't already fetched the DealStates for the given tipsets, so
|
||||||
|
// do so now
|
||||||
|
|
||||||
|
// Replace dealStateChangedForID with a function that records the
|
||||||
|
// DealStates so that we can cache them
|
||||||
|
var oldDealStateRootSaved, newDealStateRootSaved actorsmarket.DealStates
|
||||||
|
recorder := func(ctx context.Context, oldDealStateRoot, newDealStateRoot actorsmarket.DealStates) (changed bool, user state.UserData, err error) {
|
||||||
|
// Record DealStates
|
||||||
|
oldDealStateRootSaved = oldDealStateRoot
|
||||||
|
newDealStateRootSaved = newDealStateRoot
|
||||||
|
|
||||||
|
return dealStateChangedForID(ctx, oldDealStateRoot, newDealStateRoot)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the match function
|
||||||
|
dealDiff := mc.preds.OnStorageMarketActorChanged(
|
||||||
|
mc.preds.OnDealStateChanged(recorder))
|
||||||
|
matched, data, err := dealDiff(ctx, oldTs.Key(), newTs.Key())
|
||||||
|
|
||||||
|
// Save the recorded DealStates for the tipsets
|
||||||
|
mc.oldTsk = oldTs.Key()
|
||||||
|
mc.newTsk = newTs.Key()
|
||||||
|
mc.oldDealStateRoot = oldDealStateRootSaved
|
||||||
|
mc.newDealStateRoot = newDealStateRootSaved
|
||||||
|
|
||||||
|
return matched, data, err
|
||||||
|
}
|
||||||
|
return match
|
||||||
|
}
|
157
markets/storageadapter/dealstatematcher_test.go
Normal file
157
markets/storageadapter/dealstatematcher_test.go
Normal file
@ -0,0 +1,157 @@
|
|||||||
|
package storageadapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
cbornode "github.com/ipfs/go-ipld-cbor"
|
||||||
|
|
||||||
|
adt2 "github.com/filecoin-project/specs-actors/v2/actors/util/adt"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
test "github.com/filecoin-project/lotus/chain/events/state/mock"
|
||||||
|
bstore "github.com/filecoin-project/lotus/lib/blockstore"
|
||||||
|
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
|
||||||
|
|
||||||
|
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/events/state"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDealStateMatcher(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
bs := bstore.NewTemporarySync()
|
||||||
|
store := adt2.WrapStore(ctx, cbornode.NewCborStore(bs))
|
||||||
|
|
||||||
|
deal1 := &market2.DealState{
|
||||||
|
SectorStartEpoch: 1,
|
||||||
|
LastUpdatedEpoch: 2,
|
||||||
|
}
|
||||||
|
deal2 := &market2.DealState{
|
||||||
|
SectorStartEpoch: 4,
|
||||||
|
LastUpdatedEpoch: 5,
|
||||||
|
}
|
||||||
|
deal3 := &market2.DealState{
|
||||||
|
SectorStartEpoch: 7,
|
||||||
|
LastUpdatedEpoch: 8,
|
||||||
|
}
|
||||||
|
deals1 := map[abi.DealID]*market2.DealState{
|
||||||
|
abi.DealID(1): deal1,
|
||||||
|
}
|
||||||
|
deals2 := map[abi.DealID]*market2.DealState{
|
||||||
|
abi.DealID(1): deal2,
|
||||||
|
}
|
||||||
|
deals3 := map[abi.DealID]*market2.DealState{
|
||||||
|
abi.DealID(1): deal3,
|
||||||
|
}
|
||||||
|
|
||||||
|
deal1StateC := createMarketState(ctx, t, store, deals1)
|
||||||
|
deal2StateC := createMarketState(ctx, t, store, deals2)
|
||||||
|
deal3StateC := createMarketState(ctx, t, store, deals3)
|
||||||
|
|
||||||
|
minerAddr, err := address.NewFromString("t00")
|
||||||
|
require.NoError(t, err)
|
||||||
|
ts1, err := test.MockTipset(minerAddr, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
ts2, err := test.MockTipset(minerAddr, 2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
ts3, err := test.MockTipset(minerAddr, 3)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
api := test.NewMockAPI(bs)
|
||||||
|
api.SetActor(ts1.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: deal1StateC})
|
||||||
|
api.SetActor(ts2.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: deal2StateC})
|
||||||
|
api.SetActor(ts3.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: deal3StateC})
|
||||||
|
|
||||||
|
t.Run("caching", func(t *testing.T) {
|
||||||
|
dsm := newDealStateMatcher(state.NewStatePredicates(api))
|
||||||
|
matcher := dsm.matcher(ctx, abi.DealID(1))
|
||||||
|
|
||||||
|
// Call matcher with tipsets that have the same state
|
||||||
|
ok, stateChange, err := matcher(ts1, ts1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, ok)
|
||||||
|
require.Nil(t, stateChange)
|
||||||
|
// Should call StateGetActor once for each tipset
|
||||||
|
require.Equal(t, 2, api.StateGetActorCallCount())
|
||||||
|
|
||||||
|
// Call matcher with tipsets that have different state
|
||||||
|
api.ResetCallCounts()
|
||||||
|
ok, stateChange, err = matcher(ts1, ts2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, stateChange)
|
||||||
|
// Should call StateGetActor once for each tipset
|
||||||
|
require.Equal(t, 2, api.StateGetActorCallCount())
|
||||||
|
|
||||||
|
// Call matcher again with the same tipsets as above, should be cached
|
||||||
|
api.ResetCallCounts()
|
||||||
|
ok, stateChange, err = matcher(ts1, ts2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, stateChange)
|
||||||
|
// Should not call StateGetActor (because it should hit the cache)
|
||||||
|
require.Equal(t, 0, api.StateGetActorCallCount())
|
||||||
|
|
||||||
|
// Call matcher with different tipsets, should not be cached
|
||||||
|
api.ResetCallCounts()
|
||||||
|
ok, stateChange, err = matcher(ts2, ts3)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, stateChange)
|
||||||
|
// Should call StateGetActor once for each tipset
|
||||||
|
require.Equal(t, 2, api.StateGetActorCallCount())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("parallel", func(t *testing.T) {
|
||||||
|
api.ResetCallCounts()
|
||||||
|
dsm := newDealStateMatcher(state.NewStatePredicates(api))
|
||||||
|
matcher := dsm.matcher(ctx, abi.DealID(1))
|
||||||
|
|
||||||
|
// Call matcher with lots of go-routines in parallel
|
||||||
|
var eg errgroup.Group
|
||||||
|
res := make([]struct {
|
||||||
|
ok bool
|
||||||
|
stateChange events.StateChange
|
||||||
|
}, 20)
|
||||||
|
for i := 0; i < len(res); i++ {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
ok, stateChange, err := matcher(ts1, ts2)
|
||||||
|
res[i].ok = ok
|
||||||
|
res[i].stateChange = stateChange
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err := eg.Wait()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// All go-routines should have got the same (cached) result
|
||||||
|
for i := 1; i < len(res); i++ {
|
||||||
|
require.Equal(t, res[i].ok, res[i-1].ok)
|
||||||
|
require.Equal(t, res[i].stateChange, res[i-1].stateChange)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only one go-routine should have called StateGetActor
|
||||||
|
// (once for each tipset)
|
||||||
|
require.Equal(t, 2, api.StateGetActorCallCount())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState) cid.Cid {
|
||||||
|
dealRootCid := test.CreateDealAMT(ctx, t, store, deals)
|
||||||
|
state := test.CreateEmptyMarketState(t, store)
|
||||||
|
state.States = dealRootCid
|
||||||
|
|
||||||
|
stateC, err := store.Put(ctx, state)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return stateC
|
||||||
|
}
|
@ -51,6 +51,7 @@ type ProviderNodeAdapter struct {
|
|||||||
ev *events.Events
|
ev *events.Events
|
||||||
|
|
||||||
publishSpec, addBalanceSpec *api.MessageSendSpec
|
publishSpec, addBalanceSpec *api.MessageSendSpec
|
||||||
|
dsMatcher *dealStateMatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
|
func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
|
||||||
@ -58,9 +59,10 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDA
|
|||||||
na := &ProviderNodeAdapter{
|
na := &ProviderNodeAdapter{
|
||||||
FullNode: full,
|
FullNode: full,
|
||||||
|
|
||||||
dag: dag,
|
dag: dag,
|
||||||
secb: secb,
|
secb: secb,
|
||||||
ev: events.NewEvents(context.TODO(), full),
|
ev: events.NewEvents(context.TODO(), full),
|
||||||
|
dsMatcher: newDealStateMatcher(state.NewStatePredicates(full)),
|
||||||
}
|
}
|
||||||
if fc != nil {
|
if fc != nil {
|
||||||
na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)}
|
na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)}
|
||||||
@ -461,13 +463,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Watch for state changes to the deal
|
// Watch for state changes to the deal
|
||||||
preds := state.NewStatePredicates(n)
|
match := n.dsMatcher.matcher(ctx, dealID)
|
||||||
dealDiff := preds.OnStorageMarketActorChanged(
|
|
||||||
preds.OnDealStateChanged(
|
|
||||||
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
|
|
||||||
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
|
|
||||||
return dealDiff(ctx, oldTs.Key(), newTs.Key())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait until after the end epoch for the deal and then timeout
|
// Wait until after the end epoch for the deal and then timeout
|
||||||
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
|
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
|
||||||
|
Loading…
Reference in New Issue
Block a user