feat: add RPC for StateWaitMsg

This commit is contained in:
Dirk McCormick 2020-10-02 16:14:30 +02:00
parent be09a8a00a
commit 767611247c
9 changed files with 127 additions and 47 deletions

View File

@ -367,6 +367,10 @@ type FullNode interface {
// StateWaitMsg looks back in the chain for a message. If not found, it blocks until the
// message arrives on chain, and gets to the indicated confidence depth.
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*MsgLookup, error)
// StateWaitMsgLimited looks back up to limit epochs in the chain for a message.
// If not found, it blocks until the message arrives on chain, and gets to the
// indicated confidence depth.
StateWaitMsgLimited(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch) (*MsgLookup, error)
// StateListMiners returns the addresses of every miner that has claimed power in the Power Actor
StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error)
// StateListActors returns the addresses of every actor in the state

View File

@ -4,6 +4,7 @@ import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
@ -11,8 +12,10 @@ import (
type GatewayAPI interface {
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*MsgLookup, error)
}

View File

@ -190,6 +190,7 @@ type FullNodeStruct struct {
StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error) `perm:"read"`
StateMsgGasCost func(context.Context, cid.Cid, types.TipSetKey) (*api.MsgGasCost, error) `perm:"read"`
StateWaitMsg func(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) `perm:"read"`
StateWaitMsgLimited func(context.Context, cid.Cid, uint64, abi.ChainEpoch) (*api.MsgLookup, error) `perm:"read"`
StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"`
StateListMiners func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"`
StateListActors func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"`
@ -364,12 +365,14 @@ type WorkerStruct struct {
type GatewayStruct struct {
Internal struct {
// TODO: does the gateway need perms?
ChainGetTipSet func(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainHead func(ctx context.Context) (*types.TipSet, error)
MpoolPush func(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
StateAccountKey func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateGetActor func(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateLookupID func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
ChainGetTipSet func(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight func(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainHead func(ctx context.Context) (*types.TipSet, error)
MpoolPush func(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
StateAccountKey func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateGetActor func(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateLookupID func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateWaitMsg func(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
}
}
@ -866,6 +869,10 @@ func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid, confide
return c.Internal.StateWaitMsg(ctx, msgc, confidence)
}
func (c *FullNodeStruct) StateWaitMsgLimited(ctx context.Context, msgc cid.Cid, confidence uint64, limit abi.ChainEpoch) (*api.MsgLookup, error) {
return c.Internal.StateWaitMsgLimited(ctx, msgc, confidence, limit)
}
func (c *FullNodeStruct) StateSearchMsg(ctx context.Context, msgc cid.Cid) (*api.MsgLookup, error) {
return c.Internal.StateSearchMsg(ctx, msgc)
}
@ -1392,6 +1399,10 @@ func (g GatewayStruct) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey)
return g.Internal.ChainGetTipSet(ctx, tsk)
}
func (g GatewayStruct) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
return g.Internal.ChainGetTipSetByHeight(ctx, h, tsk)
}
func (g GatewayStruct) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) {
return g.Internal.MpoolPush(ctx, sm)
}
@ -1408,6 +1419,10 @@ func (g GatewayStruct) StateLookupID(ctx context.Context, addr address.Address,
return g.Internal.StateLookupID(ctx, addr, tsk)
}
func (g GatewayStruct) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
return g.Internal.StateWaitMsg(ctx, msg, confidence)
}
var _ api.Common = &CommonStruct{}
var _ api.FullNode = &FullNodeStruct{}
var _ api.StorageMiner = &StorageMinerStruct{}

View File

@ -38,6 +38,8 @@ import (
"github.com/filecoin-project/lotus/chain/vm"
)
const LookbackNoLimit = abi.ChainEpoch(-1)
var log = logging.Logger("statemgr")
type StateManagerAPI interface {
@ -514,7 +516,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
return nil, fmt.Errorf("failed to load message: %w", err)
}
_, r, _, err := sm.searchBackForMsg(ctx, ts, m)
_, r, _, err := sm.searchBackForMsg(ctx, ts, m, LookbackNoLimit)
if err != nil {
return nil, fmt.Errorf("failed to look back through chain for message: %w", err)
}
@ -523,9 +525,9 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
}
// WaitForMessage blocks until a message appears on chain. It looks backwards in the chain to see if this has already
// happened. It guarantees that the message has been on chain for at least confidence epochs without being reverted
// before returning.
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
// happened, with an optional limit to how many epochs it will search. It guarantees that the message has been on
// chain for at least confidence epochs without being reverted before returning.
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -563,7 +565,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
var backFm cid.Cid
backSearchWait := make(chan struct{})
go func() {
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head[0].Val, msg)
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head[0].Val, msg, lookbackLimit)
if err != nil {
log.Warnf("failed to look back through chain for message: %w", err)
return
@ -655,7 +657,7 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*ty
return head, r, foundMsg, nil
}
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg)
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg, LookbackNoLimit)
if err != nil {
log.Warnf("failed to look back through chain for message %s", mcid)
@ -669,7 +671,15 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*ty
return fts, r, foundMsg, nil
}
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
// searchBackForMsg searches up to limit tipsets backwards from the given
// tipset for a message receipt.
// If limit is
// - 0 then no tipsets are searched
// - 5 then five tipset are searched
// - LookbackNoLimit then there is no limit
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg, limit abi.ChainEpoch) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
limitHeight := from.Height() - limit
noLimit := limit == LookbackNoLimit
cur := from
curActor, err := sm.LoadActor(ctx, m.VMMessage().From, cur)
@ -685,7 +695,9 @@ func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet
mNonce := m.VMMessage().Nonce
for {
if cur.Height() == 0 {
// If we've reached the genesis block, or we've reached the limit of
// how far back to look
if cur.Height() == 0 || !noLimit && cur.Height() <= limitHeight {
// it ain't here!
return nil, nil, cid.Undef, nil
}

View File

@ -6,13 +6,18 @@ import (
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/ipfs/go-cid"
)
const LookbackCap = time.Hour
const (
LookbackCap = time.Hour
stateWaitLookbackLimit = abi.ChainEpoch(20)
)
var (
ErrLookbackTooLong = fmt.Errorf("lookbacks of more than %s are disallowed", LookbackCap)
@ -22,26 +27,41 @@ type GatewayAPI struct {
api api.FullNode
}
func (a *GatewayAPI) getTipsetTimestamp(ctx context.Context, tsk types.TipSetKey) (time.Time, error) {
func (a *GatewayAPI) checkTipsetKey(ctx context.Context, tsk types.TipSetKey) error {
if tsk.IsEmpty() {
return time.Now(), nil
return nil
}
ts, err := a.api.ChainGetTipSet(ctx, tsk)
if err != nil {
return time.Time{}, err
}
return time.Unix(int64(ts.Blocks()[0].Timestamp), 0), nil
}
func (a *GatewayAPI) checkTipset(ctx context.Context, ts types.TipSetKey) error {
when, err := a.getTipsetTimestamp(ctx, ts)
if err != nil {
return err
}
if time.Since(when) > time.Hour {
return a.checkTipset(ts)
}
func (a *GatewayAPI) checkTipset(ts *types.TipSet) error {
at := time.Unix(int64(ts.Blocks()[0].Timestamp), 0)
if err := a.checkTimestamp(at); err != nil {
return fmt.Errorf("bad tipset: %w", err)
}
return nil
}
// TODO: write tests for this check
func (a *GatewayAPI) checkTipsetHeight(ts *types.TipSet, h abi.ChainEpoch) error {
tsBlock := ts.Blocks()[0]
heightDelta := time.Duration(uint64(tsBlock.Height-h)*build.BlockDelaySecs) * time.Second
timeAtHeight := time.Unix(int64(tsBlock.Timestamp), 0).Add(-heightDelta)
if err := a.checkTimestamp(timeAtHeight); err != nil {
return fmt.Errorf("bad tipset height: %w", err)
}
return nil
}
func (a *GatewayAPI) checkTimestamp(at time.Time) error {
if time.Since(at) > LookbackCap {
return ErrLookbackTooLong
}
@ -55,12 +75,26 @@ func (a *GatewayAPI) ChainHead(ctx context.Context) (*types.TipSet, error) {
}
func (a *GatewayAPI) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
if err := a.checkTipset(ctx, tsk); err != nil {
return nil, fmt.Errorf("bad tipset: %w", err)
return a.api.ChainGetTipSet(ctx, tsk)
}
func (a *GatewayAPI) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
ts, err := a.api.ChainGetTipSet(ctx, tsk)
if err != nil {
return nil, err
}
// TODO: since we're limiting lookbacks, should just cache this (could really even cache the json response bytes)
return a.api.ChainGetTipSet(ctx, tsk)
// Check if the tipset key refers to a tipset that's too far in the past
if err := a.checkTipset(ts); err != nil {
return nil, err
}
// Check if the height is too far in the past
if err := a.checkTipsetHeight(ts, h); err != nil {
return nil, err
}
return a.api.ChainGetTipSetByHeight(ctx, h, tsk)
}
func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) {
@ -70,30 +104,34 @@ func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (ci
}
func (a *GatewayAPI) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
if err := a.checkTipset(ctx, tsk); err != nil {
return address.Undef, fmt.Errorf("bad tipset: %w", err)
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return address.Undef, err
}
return a.api.StateAccountKey(ctx, addr, tsk)
}
func (a *GatewayAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
if err := a.checkTipset(ctx, tsk); err != nil {
return nil, fmt.Errorf("bad tipset: %w", err)
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return nil, err
}
return a.api.StateGetActor(ctx, actor, tsk)
}
func (a *GatewayAPI) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
if err := a.checkTipset(ctx, tsk); err != nil {
return address.Undef, fmt.Errorf("bad tipset: %w", err)
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return address.Undef, err
}
return a.api.StateLookupID(ctx, addr, tsk)
}
var _ api.GatewayAPI = &GatewayAPI{}
func (a *GatewayAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
return a.api.StateWaitMsgLimited(ctx, msg, confidence, stateWaitLookbackLimit)
}
var _ api.GatewayAPI = (*GatewayAPI)(nil)
var _ full.ChainModuleAPI = (*GatewayAPI)(nil)
var _ full.MpoolModuleAPI = (*GatewayAPI)(nil)
var _ full.StateModuleAPI = (*GatewayAPI)(nil)

View File

@ -244,7 +244,7 @@ var DaemonCmd = &cli.Command{
shutdownChan := make(chan struct{})
// If the daemon is started in "lite mode", replace the StateManager
// If the daemon is started in "lite mode", replace key APIs
// with a thin client to a gateway server
liteMode := node.Options()
isLite := cctx.Bool("lite")

View File

@ -187,7 +187,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
}
// TODO: timeout
_, ret, _, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence)
_, ret, _, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence, stmgr.LookbackNoLimit)
if err != nil {
return 0, xerrors.Errorf("waiting for deal publish message: %w", err)
}

View File

@ -42,6 +42,7 @@ var log = logging.Logger("fullnode")
type ChainModuleAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
}
// ChainModule provides a default implementation of ChainModuleAPI.
@ -197,12 +198,12 @@ func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]
return out, nil
}
func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
func (m *ChainModule) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
ts, err := m.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return a.Chain.GetTipsetByHeight(ctx, h, ts, true)
return m.Chain.GetTipsetByHeight(ctx, h, ts, true)
}
func (a *ChainAPI) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) {

View File

@ -46,6 +46,7 @@ type StateModuleAPI interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
}
// StateModule provides a default implementation of StateModuleAPI.
@ -475,22 +476,28 @@ func (a *StateAPI) MinerCreateBlock(ctx context.Context, bt *api.BlockTemplate)
return &out, nil
}
func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
ts, recpt, found, err := a.StateManager.WaitForMessage(ctx, msg, confidence)
func (m *StateModule) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
return stateWaitMsgLimited(ctx, m.StateManager, m.Chain, msg, confidence, stmgr.LookbackNoLimit)
}
func (a *StateAPI) StateWaitMsgLimited(ctx context.Context, msg cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch) (*api.MsgLookup, error) {
return stateWaitMsgLimited(ctx, a.StateManager, a.Chain, msg, confidence, lookbackLimit)
}
func stateWaitMsgLimited(ctx context.Context, smgr *stmgr.StateManager, cstore *store.ChainStore, msg cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch) (*api.MsgLookup, error) {
ts, recpt, found, err := smgr.WaitForMessage(ctx, msg, confidence, lookbackLimit)
if err != nil {
return nil, err
}
var returndec interface{}
if recpt.ExitCode == 0 && len(recpt.Return) > 0 {
cmsg, err := a.Chain.GetCMessage(msg)
cmsg, err := cstore.GetCMessage(msg)
if err != nil {
return nil, xerrors.Errorf("failed to load message after successful receipt search: %w", err)
}
vmsg := cmsg.VMMessage()
t, err := stmgr.GetReturnType(ctx, a.StateManager, vmsg.To, vmsg.Method, ts)
t, err := stmgr.GetReturnType(ctx, smgr, vmsg.To, vmsg.Method, ts)
if err != nil {
return nil, xerrors.Errorf("failed to get return type: %w", err)
}