lotus/chain/events/state/predicates.go

535 lines
16 KiB
Go
Raw Normal View History

2020-06-22 21:38:40 +00:00
package state
import (
"bytes"
2020-06-22 21:38:40 +00:00
"context"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
typegen "github.com/whyrusleeping/cbor-gen"
2020-06-22 21:38:40 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/types"
2020-06-22 21:38:40 +00:00
)
// UserData is the data returned from the DiffTipSetKeyFunc
2020-06-25 21:43:37 +00:00
type UserData interface{}
2020-06-26 19:36:48 +00:00
// ChainAPI abstracts out calls made by this class to external APIs
type ChainAPI interface {
2020-06-26 15:51:45 +00:00
apibstore.ChainIO
2020-06-25 21:43:37 +00:00
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
}
2020-06-26 19:36:48 +00:00
// StatePredicates has common predicates for responding to state changes
2020-06-22 21:38:40 +00:00
type StatePredicates struct {
2020-06-26 19:36:48 +00:00
api ChainAPI
2020-06-25 21:43:37 +00:00
cst *cbor.BasicIpldStore
2020-06-22 21:38:40 +00:00
}
2020-06-26 19:36:48 +00:00
func NewStatePredicates(api ChainAPI) *StatePredicates {
2020-06-22 21:38:40 +00:00
return &StatePredicates{
2020-06-25 21:43:37 +00:00
api: api,
cst: cbor.NewCborStore(apibstore.NewAPIBlockstore(api)),
2020-06-22 21:38:40 +00:00
}
}
// DiffTipSetKeyFunc check if there's a change form oldState to newState, and returns
2020-06-26 18:59:23 +00:00
// - changed: was there a change
// - user: user-defined data representing the state change
// - err
type DiffTipSetKeyFunc func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error)
2020-06-25 21:43:37 +00:00
type DiffActorStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error)
2020-06-22 21:38:40 +00:00
2020-06-26 19:36:48 +00:00
// OnActorStateChanged calls diffStateFunc when the state changes for the given actor
func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffActorStateFunc) DiffTipSetKeyFunc {
return func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error) {
oldActor, err := sp.api.StateGetActor(ctx, addr, oldState)
2020-06-22 21:38:40 +00:00
if err != nil {
return false, nil, err
}
newActor, err := sp.api.StateGetActor(ctx, addr, newState)
2020-06-26 19:36:48 +00:00
if err != nil {
return false, nil, err
}
2020-06-22 21:38:40 +00:00
if oldActor.Head.Equals(newActor.Head) {
return false, nil, nil
}
return diffStateFunc(ctx, oldActor.Head, newActor.Head)
}
}
type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error)
2020-06-26 19:36:48 +00:00
// OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor
func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffTipSetKeyFunc {
2020-06-22 21:38:40 +00:00
return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState market.State
2020-06-25 21:43:37 +00:00
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
2020-06-22 21:38:40 +00:00
return false, nil, err
}
var newState market.State
2020-06-25 21:43:37 +00:00
if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil {
2020-06-22 21:38:40 +00:00
return false, nil, err
}
return diffStorageMarketState(ctx, &oldState, &newState)
})
}
type DiffAdtArraysFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot *adt.Array) (changed bool, user UserData, err error)
2020-06-22 21:38:40 +00:00
// OnDealStateChanged calls diffDealStates when the market deal state changes
func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffAdtArraysFunc) DiffStorageMarketStateFunc {
2020-06-22 21:38:40 +00:00
return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) {
if oldState.States.Equals(newState.States) {
return false, nil, nil
}
2020-06-25 21:43:37 +00:00
ctxStore := &contextStore{
ctx: ctx,
cst: sp.cst,
}
oldRoot, err := adt.AsArray(ctxStore, oldState.States)
2020-06-22 21:38:40 +00:00
if err != nil {
return false, nil, err
}
newRoot, err := adt.AsArray(ctxStore, newState.States)
2020-06-22 21:38:40 +00:00
if err != nil {
return false, nil, err
}
2020-06-25 21:43:37 +00:00
2020-06-22 21:38:40 +00:00
return diffDealStates(ctx, oldRoot, newRoot)
}
}
// OnDealProposalChanged calls diffDealProps when the market proposal state changes
func (sp *StatePredicates) OnDealProposalChanged(diffDealProps DiffAdtArraysFunc) DiffStorageMarketStateFunc {
return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) {
if oldState.Proposals.Equals(newState.Proposals) {
return false, nil, nil
}
ctxStore := &contextStore{
ctx: ctx,
cst: sp.cst,
}
oldRoot, err := adt.AsArray(ctxStore, oldState.Proposals)
if err != nil {
return false, nil, err
}
newRoot, err := adt.AsArray(ctxStore, newState.Proposals)
if err != nil {
return false, nil, err
}
return diffDealProps(ctx, oldRoot, newRoot)
}
}
var _ AdtArrayDiff = &MarketDealProposalChanges{}
type MarketDealProposalChanges struct {
Added []ProposalIDState
Removed []ProposalIDState
}
type ProposalIDState struct {
ID abi.DealID
Proposal market.DealProposal
}
func (m *MarketDealProposalChanges) Add(key uint64, val *typegen.Deferred) error {
dp := new(market.DealProposal)
err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, ProposalIDState{abi.DealID(key), *dp})
return nil
}
func (m *MarketDealProposalChanges) Modify(key uint64, from, to *typegen.Deferred) error {
// short circuit, DealProposals are static
return nil
}
func (m *MarketDealProposalChanges) Remove(key uint64, val *typegen.Deferred) error {
dp := new(market.DealProposal)
err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, ProposalIDState{abi.DealID(key), *dp})
return nil
}
// OnDealProposalAmtChanged detects changes in the deal proposal AMT for all deal proposals and returns a MarketProposalsChanges structure containing:
// - Added Proposals
// - Modified Proposals
// - Removed Proposals
func (sp *StatePredicates) OnDealProposalAmtChanged() DiffAdtArraysFunc {
return func(ctx context.Context, oldDealProps, newDealProps *adt.Array) (changed bool, user UserData, err error) {
proposalChanges := new(MarketDealProposalChanges)
if err := DiffAdtArray(oldDealProps, newDealProps, proposalChanges); err != nil {
return false, nil, err
}
if len(proposalChanges.Added)+len(proposalChanges.Removed) == 0 {
return false, nil, nil
}
return true, proposalChanges, nil
}
}
var _ AdtArrayDiff = &MarketDealStateChanges{}
type MarketDealStateChanges struct {
Added []DealIDState
Modified []DealStateChange
Removed []DealIDState
}
type DealIDState struct {
ID abi.DealID
Deal market.DealState
}
func (m *MarketDealStateChanges) Add(key uint64, val *typegen.Deferred) error {
ds := new(market.DealState)
err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, DealIDState{abi.DealID(key), *ds})
return nil
}
func (m *MarketDealStateChanges) Modify(key uint64, from, to *typegen.Deferred) error {
dsFrom := new(market.DealState)
if err := dsFrom.UnmarshalCBOR(bytes.NewReader(from.Raw)); err != nil {
return err
}
dsTo := new(market.DealState)
if err := dsTo.UnmarshalCBOR(bytes.NewReader(to.Raw)); err != nil {
return err
}
if *dsFrom != *dsTo {
m.Modified = append(m.Modified, DealStateChange{abi.DealID(key), dsFrom, dsTo})
}
return nil
}
func (m *MarketDealStateChanges) Remove(key uint64, val *typegen.Deferred) error {
ds := new(market.DealState)
err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, DealIDState{abi.DealID(key), *ds})
return nil
}
// OnDealStateAmtChanged detects changes in the deal state AMT for all deal states and returns a MarketDealStateChanges structure containing:
// - Added Deals
// - Modified Deals
// - Removed Deals
func (sp *StatePredicates) OnDealStateAmtChanged() DiffAdtArraysFunc {
return func(ctx context.Context, oldDealStates, newDealStates *adt.Array) (changed bool, user UserData, err error) {
dealStateChanges := new(MarketDealStateChanges)
if err := DiffAdtArray(oldDealStates, newDealStates, dealStateChanges); err != nil {
return false, nil, err
}
if len(dealStateChanges.Added)+len(dealStateChanges.Modified)+len(dealStateChanges.Removed) == 0 {
return false, nil, nil
}
return true, dealStateChanges, nil
}
}
2020-06-26 19:36:48 +00:00
// ChangedDeals is a set of changes to deal state
2020-06-26 15:51:45 +00:00
type ChangedDeals map[abi.DealID]DealStateChange
2020-06-26 19:36:48 +00:00
// DealStateChange is a change in deal state from -> to
2020-06-26 15:51:45 +00:00
type DealStateChange struct {
ID abi.DealID
From *market.DealState
To *market.DealState
2020-06-26 15:51:45 +00:00
}
2020-06-26 19:36:48 +00:00
// DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs
func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffAdtArraysFunc {
return func(ctx context.Context, oldDealStateArray, newDealStateArray *adt.Array) (changed bool, user UserData, err error) {
2020-06-26 15:51:45 +00:00
changedDeals := make(ChangedDeals)
2020-06-26 19:36:48 +00:00
for _, dealID := range dealIds {
var oldDealPtr, newDealPtr *market.DealState
2020-06-25 21:43:37 +00:00
var oldDeal, newDeal market.DealState
// If the deal has been removed, we just set it to nil
found, err := oldDealStateArray.Get(uint64(dealID), &oldDeal)
2020-06-25 21:43:37 +00:00
if err != nil {
return false, nil, err
}
if found {
oldDealPtr = &oldDeal
}
found, err = newDealStateArray.Get(uint64(dealID), &newDeal)
2020-06-25 21:43:37 +00:00
if err != nil {
return false, nil, err
}
if found {
newDealPtr = &newDeal
2020-06-25 21:43:37 +00:00
}
if oldDeal != newDeal {
changedDeals[dealID] = DealStateChange{dealID, oldDealPtr, newDealPtr}
2020-06-25 21:43:37 +00:00
}
2020-06-22 21:38:40 +00:00
}
2020-06-25 21:43:37 +00:00
if len(changedDeals) > 0 {
2020-06-26 15:51:45 +00:00
return true, changedDeals, nil
2020-06-22 21:38:40 +00:00
}
2020-06-25 21:43:37 +00:00
return false, nil, nil
2020-06-22 21:38:40 +00:00
}
}
type DiffMinerActorStateFunc func(ctx context.Context, oldState *miner.State, newState *miner.State) (changed bool, user UserData, err error)
func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMinerActorState DiffMinerActorStateFunc) DiffTipSetKeyFunc {
return sp.OnActorStateChanged(minerAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState miner.State
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
return false, nil, err
}
var newState miner.State
if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil {
return false, nil, err
}
return diffMinerActorState(ctx, &oldState, &newState)
})
}
type MinerSectorChanges struct {
Added []miner.SectorOnChainInfo
Extended []SectorExtensions
Removed []miner.SectorOnChainInfo
}
var _ AdtArrayDiff = &MinerSectorChanges{}
type SectorExtensions struct {
From miner.SectorOnChainInfo
To miner.SectorOnChainInfo
}
func (m *MinerSectorChanges) Add(key uint64, val *typegen.Deferred) error {
si := new(miner.SectorOnChainInfo)
err := si.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, *si)
return nil
}
func (m *MinerSectorChanges) Modify(key uint64, from, to *typegen.Deferred) error {
siFrom := new(miner.SectorOnChainInfo)
err := siFrom.UnmarshalCBOR(bytes.NewReader(from.Raw))
if err != nil {
return err
}
siTo := new(miner.SectorOnChainInfo)
err = siTo.UnmarshalCBOR(bytes.NewReader(to.Raw))
if err != nil {
return err
}
if siFrom.Expiration != siTo.Expiration {
m.Extended = append(m.Extended, SectorExtensions{
From: *siFrom,
To: *siTo,
})
}
return nil
}
func (m *MinerSectorChanges) Remove(key uint64, val *typegen.Deferred) error {
si := new(miner.SectorOnChainInfo)
err := si.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, *si)
return nil
}
func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) {
ctxStore := &contextStore{
ctx: ctx,
cst: sp.cst,
}
sectorChanges := &MinerSectorChanges{
Added: []miner.SectorOnChainInfo{},
Extended: []SectorExtensions{},
Removed: []miner.SectorOnChainInfo{},
}
// no sector changes
if oldState.Sectors.Equals(newState.Sectors) {
return false, nil, nil
}
oldSectors, err := adt.AsArray(ctxStore, oldState.Sectors)
if err != nil {
return false, nil, err
}
newSectors, err := adt.AsArray(ctxStore, newState.Sectors)
if err != nil {
return false, nil, err
}
if err := DiffAdtArray(oldSectors, newSectors, sectorChanges); err != nil {
return false, nil, err
}
// nothing changed
if len(sectorChanges.Added)+len(sectorChanges.Extended)+len(sectorChanges.Removed) == 0 {
return false, nil, nil
}
return true, sectorChanges, nil
}
}
type MinerPreCommitChanges struct {
Added []miner.SectorPreCommitOnChainInfo
Removed []miner.SectorPreCommitOnChainInfo
}
func (m *MinerPreCommitChanges) AsKey(key string) (adt.Keyer, error) {
sector, err := adt.ParseUIntKey(key)
if err != nil {
return nil, err
}
return miner.SectorKey(abi.SectorNumber(sector)), nil
}
func (m *MinerPreCommitChanges) Add(key string, val *typegen.Deferred) error {
sp := new(miner.SectorPreCommitOnChainInfo)
err := sp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, *sp)
return nil
}
func (m *MinerPreCommitChanges) Modify(key string, from, to *typegen.Deferred) error {
return nil
}
func (m *MinerPreCommitChanges) Remove(key string, val *typegen.Deferred) error {
sp := new(miner.SectorPreCommitOnChainInfo)
err := sp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, *sp)
return nil
}
func (sp *StatePredicates) OnMinerPreCommitChange() DiffMinerActorStateFunc {
return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) {
ctxStore := &contextStore{
ctx: ctx,
cst: sp.cst,
}
precommitChanges := &MinerPreCommitChanges{
Added: []miner.SectorPreCommitOnChainInfo{},
Removed: []miner.SectorPreCommitOnChainInfo{},
}
if oldState.PreCommittedSectors.Equals(newState.PreCommittedSectors) {
return false, nil, nil
}
oldPrecommits, err := adt.AsMap(ctxStore, oldState.PreCommittedSectors)
if err != nil {
return false, nil, err
}
newPrecommits, err := adt.AsMap(ctxStore, newState.PreCommittedSectors)
if err != nil {
return false, nil, err
}
if err := DiffAdtMap(oldPrecommits, newPrecommits, precommitChanges); err != nil {
return false, nil, err
}
if len(precommitChanges.Added)+len(precommitChanges.Removed) == 0 {
return false, nil, nil
}
return true, precommitChanges, nil
}
}
// DiffPaymentChannelStateFunc is function that compares two states for the payment channel
type DiffPaymentChannelStateFunc func(ctx context.Context, oldState *paych.State, newState *paych.State) (changed bool, user UserData, err error)
// OnPaymentChannelActorChanged calls diffPaymentChannelState when the state changes for the the payment channel actor
func (sp *StatePredicates) OnPaymentChannelActorChanged(paychAddr address.Address, diffPaymentChannelState DiffPaymentChannelStateFunc) DiffTipSetKeyFunc {
return sp.OnActorStateChanged(paychAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState paych.State
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
return false, nil, err
}
var newState paych.State
if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil {
return false, nil, err
}
return diffPaymentChannelState(ctx, &oldState, &newState)
})
}
// PayChToSendChange is a difference in the amount to send on a payment channel when the money is collected
type PayChToSendChange struct {
OldToSend abi.TokenAmount
NewToSend abi.TokenAmount
}
// OnToSendAmountChanges monitors changes on the total amount to send from one party to the other on a payment channel
func (sp *StatePredicates) OnToSendAmountChanges() DiffPaymentChannelStateFunc {
return func(ctx context.Context, oldState *paych.State, newState *paych.State) (changed bool, user UserData, err error) {
if oldState.ToSend.Equals(newState.ToSend) {
return false, nil, nil
}
return true, &PayChToSendChange{
OldToSend: oldState.ToSend,
NewToSend: newState.ToSend,
}, nil
}
}