feat: add miner sector predicate and test

-polish: OnActorStateChanged operate over TipSetKey
    - it was calling key() interally and tipsetkeys are cheaper to get than
the full tipset
  -polish: improve predicate method names
This commit is contained in:
frrist 2020-07-01 18:20:54 -07:00
parent cf6ac44b6e
commit 3c6e46cd70
2 changed files with 304 additions and 29 deletions

View File

@ -3,18 +3,23 @@ package state
import (
"context"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/types"
)
// UserData is the data returned from the DiffFunc
// UserData is the data returned from the DiffTipSetKeyFunc
type UserData interface{}
// ChainAPI abstracts out calls made by this class to external APIs
@ -36,22 +41,22 @@ func NewStatePredicates(api ChainAPI) *StatePredicates {
}
}
// DiffFunc check if there's a change form oldState to newState, and returns
// DiffTipSetKeyFunc check if there's a change form oldState to newState, and returns
// - changed: was there a change
// - user: user-defined data representing the state change
// - err
type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error)
type DiffTipSetKeyFunc func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error)
type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error)
type DiffActorStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error)
// OnActorStateChanged calls diffStateFunc when the state changes for the given actor
func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc {
return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) {
oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key())
func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffActorStateFunc) DiffTipSetKeyFunc {
return func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error) {
oldActor, err := sp.api.StateGetActor(ctx, addr, oldState)
if err != nil {
return false, nil, err
}
newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key())
newActor, err := sp.api.StateGetActor(ctx, addr, newState)
if err != nil {
return false, nil, err
}
@ -66,7 +71,7 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu
type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error)
// OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor
func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc {
func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffTipSetKeyFunc {
return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState market.State
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
@ -135,3 +140,123 @@ func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDeal
return false, nil, nil
}
}
type DiffMinerActorStateFunc func(ctx context.Context, oldState *miner.State, newState *miner.State) (changed bool, user UserData, err error)
func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMinerActorState DiffMinerActorStateFunc) DiffTipSetKeyFunc {
return sp.OnActorStateChanged(minerAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState miner.State
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
return false, nil, err
}
var newState miner.State
if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil {
return false, nil, err
}
return diffMinerActorState(ctx, &oldState, &newState)
})
}
type MinerSectorChanges struct {
Added []miner.SectorOnChainInfo
Extended []SectorExtensions
Removed []miner.SectorOnChainInfo
}
type SectorExtensions struct {
From miner.SectorOnChainInfo
To miner.SectorOnChainInfo
}
func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) {
ctxStore := &contextStore{
ctx: context.TODO(),
cst: sp.cst,
}
sectorChanges := &MinerSectorChanges{
Added: []miner.SectorOnChainInfo{},
Extended: []SectorExtensions{},
Removed: []miner.SectorOnChainInfo{},
}
// no sector changes
if oldState.Sectors.Equals(newState.Sectors) {
return false, nil, nil
}
oldSectors, err := adt.AsArray(ctxStore, oldState.Sectors)
if err != nil {
return false, nil, err
}
newSectors, err := adt.AsArray(ctxStore, newState.Sectors)
if err != nil {
return false, nil, err
}
var osi miner.SectorOnChainInfo
// find all sectors that were extended or removed
if err := oldSectors.ForEach(&osi, func(i int64) error {
var nsi miner.SectorOnChainInfo
found, err := newSectors.Get(uint64(osi.Info.SectorNumber), &nsi)
if err != nil {
return err
}
if !found {
sectorChanges.Removed = append(sectorChanges.Removed, osi)
return nil
}
if nsi.Info.Expiration != osi.Info.Expiration {
sectorChanges.Extended = append(sectorChanges.Extended, SectorExtensions{
From: osi,
To: nsi,
})
}
// we don't update miners state filed with `newSectors.Root()` so this operation is safe.
if err := newSectors.Delete(uint64(osi.Info.SectorNumber)); err != nil {
return err
}
return nil
}); err != nil {
return false, nil, err
}
// all sectors that remain in newSectors are new
var nsi miner.SectorOnChainInfo
if err := newSectors.ForEach(&nsi, func(i int64) error {
sectorChanges.Added = append(sectorChanges.Added, nsi)
return nil
}); err != nil {
return false, nil, err
}
// nothing changed
if len(sectorChanges.Added)+len(sectorChanges.Extended)+len(sectorChanges.Removed) == 0 {
return false, nil, nil
}
return true, sectorChanges, nil
}
}
type contextStore struct {
ctx context.Context
cst *cbor.BasicIpldStore
}
func (cs *contextStore) Context() context.Context {
return cs.ctx
}
func (cs *contextStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
return cs.cst.Get(ctx, c, out)
}
func (cs *contextStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cs.cst.Put(ctx, v)
}

View File

@ -4,23 +4,26 @@ import (
"context"
"testing"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-hamt-ipld"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-hamt-ipld"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto"
tutils "github.com/filecoin-project/specs-actors/support/testing"
"github.com/filecoin-project/lotus/chain/types"
)
var dummyCid cid.Cid
@ -112,12 +115,12 @@ func TestPredicates(t *testing.T) {
diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds)))
// Diff a state against itself: expect no change
changed, _, err := diffFn(ctx, oldState, oldState)
changed, _, err := diffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, changed)
// Diff old state against new state
changed, val, err := diffFn(ctx, oldState, newState)
changed, val, err := diffFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.True(t, changed)
@ -142,7 +145,7 @@ func TestPredicates(t *testing.T) {
t.Fatal("No state change so this should not be called")
return false, nil, nil
})
changed, _, err = actorDiffFn(ctx, oldState, oldState)
changed, _, err = actorDiffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, changed)
@ -157,6 +160,87 @@ func TestPredicates(t *testing.T) {
require.False(t, changed)
}
func TestMinerSectorChange(t *testing.T) {
ctx := context.Background()
bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
store := cbornode.NewCborStore(bs)
nextID := uint64(0)
nextIDAddrF := func() address.Address {
defer func() { nextID++ }()
return tutils.NewIDAddr(t, nextID)
}
owner, worker := nextIDAddrF(), nextIDAddrF()
si0 := newSectorOnChainInfo(0, tutils.MakeCID("0"), big.NewInt(0), abi.ChainEpoch(0), abi.ChainEpoch(10))
si1 := newSectorOnChainInfo(1, tutils.MakeCID("1"), big.NewInt(1), abi.ChainEpoch(1), abi.ChainEpoch(11))
si2 := newSectorOnChainInfo(2, tutils.MakeCID("2"), big.NewInt(2), abi.ChainEpoch(2), abi.ChainEpoch(11))
oldMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si0, si1, si2})
si3 := newSectorOnChainInfo(3, tutils.MakeCID("3"), big.NewInt(3), abi.ChainEpoch(3), abi.ChainEpoch(12))
// 0 delete
// 1 extend
// 2 same
// 3 added
si1Ext := si1
si1Ext.Info.Expiration++
newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3})
minerAddr := nextIDAddrF()
oldState, err := mockTipset(minerAddr, 1)
require.NoError(t, err)
newState, err := mockTipset(minerAddr, 2)
require.NoError(t, err)
api := newMockAPI(bs)
api.setActor(oldState.Key(), &types.Actor{Head: oldMinerC})
api.setActor(newState.Key(), &types.Actor{Head: newMinerC})
preds := NewStatePredicates(api)
minerDiffFn := preds.OnMinerActorChange(minerAddr, preds.OnMinerSectorChange())
change, val, err := minerDiffFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.True(t, change)
require.NotNil(t, val)
sectorChanges, ok := val.(*MinerSectorChanges)
require.True(t, ok)
require.Equal(t, len(sectorChanges.Added), 1)
require.Equal(t, sectorChanges.Added[0], si3)
require.Equal(t, len(sectorChanges.Removed), 1)
require.Equal(t, sectorChanges.Removed[0], si0)
require.Equal(t, len(sectorChanges.Extended), 1)
require.Equal(t, sectorChanges.Extended[0].From, si1)
require.Equal(t, sectorChanges.Extended[0].To, si1Ext)
change, val, err = minerDiffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, change)
require.Nil(t, val)
change, val, err = minerDiffFn(ctx, newState.Key(), oldState.Key())
require.NoError(t, err)
require.True(t, change)
require.NotNil(t, val)
sectorChanges, ok = val.(*MinerSectorChanges)
require.True(t, ok)
require.Equal(t, len(sectorChanges.Added), 1)
require.Equal(t, sectorChanges.Added[0], si0)
require.Equal(t, len(sectorChanges.Removed), 1)
require.Equal(t, sectorChanges.Removed[0], si3)
require.Equal(t, len(sectorChanges.Extended), 1)
require.Equal(t, sectorChanges.Extended[0].To, si1)
require.Equal(t, sectorChanges.Extended[0].From, si1Ext)
}
func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) {
return types.NewTipSet([]*types.BlockHeader{{
Miner: miner,
@ -171,7 +255,7 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error)
}
func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
rootCid := createAMT(ctx, t, store, deals)
rootCid := createDealAMT(ctx, t, store, deals)
state := createEmptyMarketState(t, store)
state.States = rootCid
@ -189,7 +273,7 @@ func createEmptyMarketState(t *testing.T, store *cbornode.BasicIpldStore) *marke
return market.ConstructState(emptyArrayCid, emptyMap, emptyMap)
}
func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
func createDealAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
root := amt.NewAMT(store)
for dealID, dealState := range deals {
err := root.Set(ctx, uint64(dealID), dealState)
@ -199,3 +283,69 @@ func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore
require.NoError(t, err)
return rootCid
}
func createMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid {
rootCid := createSectorsAMT(ctx, t, store, sectors)
state := createEmptyMinerState(ctx, t, store, owner, worker)
state.Sectors = rootCid
stateC, err := store.Put(ctx, state)
require.NoError(t, err)
return stateC
}
func createEmptyMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address) *miner.State {
emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO())
require.NoError(t, err)
emptyMap, err := store.Put(context.TODO(), hamt.NewNode(store, hamt.UseTreeBitWidth(5)))
require.NoError(t, err)
emptyDeadlines := miner.ConstructDeadlines()
emptyDeadlinesCid, err := store.Put(context.Background(), emptyDeadlines)
require.NoError(t, err)
state, err := miner.ConstructState(emptyArrayCid, emptyMap, emptyDeadlinesCid, owner, worker, abi.PeerID{'1'}, nil, abi.RegisteredSealProof_StackedDrg64GiBV1, 0)
require.NoError(t, err)
return state
}
func createSectorsAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, sectors []miner.SectorOnChainInfo) cid.Cid {
root := amt.NewAMT(store)
for _, sector := range sectors {
sector := sector
err := root.Set(ctx, uint64(sector.Info.SectorNumber), &sector)
require.NoError(t, err)
}
rootCid, err := root.Flush(ctx)
require.NoError(t, err)
return rootCid
}
// returns a unique SectorOnChainInfo with each invocation with SectorNumber set to `sectorNo`.
func newSectorOnChainInfo(sectorNo abi.SectorNumber, sealed cid.Cid, weight big.Int, activation, expiration abi.ChainEpoch) miner.SectorOnChainInfo {
info := newSectorPreCommitInfo(sectorNo, sealed, expiration)
return miner.SectorOnChainInfo{
Info: *info,
ActivationEpoch: activation,
DealWeight: weight,
VerifiedDealWeight: weight,
}
}
const (
sectorSealRandEpochValue = abi.ChainEpoch(1)
)
// returns a unique SectorPreCommitInfo with each invocation with SectorNumber set to `sectorNo`.
func newSectorPreCommitInfo(sectorNo abi.SectorNumber, sealed cid.Cid, expiration abi.ChainEpoch) *miner.SectorPreCommitInfo {
return &miner.SectorPreCommitInfo{
SealProof: abi.RegisteredSealProof_StackedDrg32GiBV1,
SectorNumber: sectorNo,
SealedCID: sealed,
SealRandEpoch: sectorSealRandEpochValue,
DealIDs: nil,
Expiration: expiration,
}
}