add confidence and timeout to message wait
This commit is contained in:
parent
8aa158d4e6
commit
5574e4f11b
@ -155,7 +155,7 @@ type FullNode interface {
|
|||||||
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
|
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
|
||||||
StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
|
StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
|
||||||
StatePledgeCollateral(context.Context, types.TipSetKey) (types.BigInt, error)
|
StatePledgeCollateral(context.Context, types.TipSetKey) (types.BigInt, error)
|
||||||
StateWaitMsg(context.Context, cid.Cid) (*MsgLookup, error)
|
StateWaitMsg(context.Context, cid.Cid, uint64, uint64) (*MsgLookup, error)
|
||||||
StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error)
|
StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error)
|
||||||
StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error)
|
StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error)
|
||||||
StateListActors(context.Context, types.TipSetKey) ([]address.Address, error)
|
StateListActors(context.Context, types.TipSetKey) ([]address.Address, error)
|
||||||
|
@ -133,7 +133,7 @@ type FullNodeStruct struct {
|
|||||||
StateGetActor func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error) `perm:"read"`
|
StateGetActor func(context.Context, address.Address, types.TipSetKey) (*types.Actor, error) `perm:"read"`
|
||||||
StateReadState func(context.Context, *types.Actor, types.TipSetKey) (*api.ActorState, error) `perm:"read"`
|
StateReadState func(context.Context, *types.Actor, types.TipSetKey) (*api.ActorState, error) `perm:"read"`
|
||||||
StatePledgeCollateral func(context.Context, types.TipSetKey) (types.BigInt, error) `perm:"read"`
|
StatePledgeCollateral func(context.Context, types.TipSetKey) (types.BigInt, error) `perm:"read"`
|
||||||
StateWaitMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"`
|
StateWaitMsg func(context.Context, cid.Cid, uint64, uint64) (*api.MsgLookup, error) `perm:"read"`
|
||||||
StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"`
|
StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"`
|
||||||
StateListMiners func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"`
|
StateListMiners func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"`
|
||||||
StateListActors func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"`
|
StateListActors func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"`
|
||||||
@ -581,8 +581,8 @@ func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, tsk types.Ti
|
|||||||
return c.Internal.StatePledgeCollateral(ctx, tsk)
|
return c.Internal.StatePledgeCollateral(ctx, tsk)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid) (*api.MsgLookup, error) {
|
func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid, confidence uint64, timeout uint64) (*api.MsgLookup, error) {
|
||||||
return c.Internal.StateWaitMsg(ctx, msgc)
|
return c.Internal.StateWaitMsg(ctx, msgc, confidence, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) StateSearchMsg(ctx context.Context, msgc cid.Cid) (*api.MsgLookup, error) {
|
func (c *FullNodeStruct) StateSearchMsg(ctx context.Context, msgc cid.Cid) (*api.MsgLookup, error) {
|
||||||
|
@ -59,6 +59,8 @@ var BlocksPerEpoch = uint64(builtin.ExpectedLeadersPerEpoch)
|
|||||||
|
|
||||||
// Epochs
|
// Epochs
|
||||||
const Finality = miner.ChainFinalityish
|
const Finality = miner.ChainFinalityish
|
||||||
|
const MessageConfidence = 5
|
||||||
|
const MessageTimeout = 72
|
||||||
|
|
||||||
// constants for Weight calculation
|
// constants for Weight calculation
|
||||||
// The ratio of weight contributed by short-term vs long-term factors in a given round
|
// The ratio of weight contributed by short-term vs long-term factors in a given round
|
||||||
|
@ -480,7 +480,10 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
|
|||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) {
|
// WaitForMessage blocks until a message appears on chain. It looks backwards in the chain to see if this has already
|
||||||
|
// happened. It guarantees that the message has been on chain for at least confidence epochs without being reverted
|
||||||
|
// before returning.
|
||||||
|
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64, timeout uint64) (*types.TipSet, *types.MessageReceipt, error) {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -528,6 +531,11 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*type
|
|||||||
close(backSearchWait)
|
close(backSearchWait)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
var candidateTs *types.TipSet
|
||||||
|
var candidateRcp *types.MessageReceipt
|
||||||
|
heightOfHead := head[0].Val.Height()
|
||||||
|
reverts := map[types.TipSetKey]bool{}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case notif, ok := <-tsub:
|
case notif, ok := <-tsub:
|
||||||
@ -537,21 +545,49 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*type
|
|||||||
for _, val := range notif {
|
for _, val := range notif {
|
||||||
switch val.Type {
|
switch val.Type {
|
||||||
case store.HCRevert:
|
case store.HCRevert:
|
||||||
continue
|
if val.Val.Equals(candidateTs) {
|
||||||
|
candidateTs = nil
|
||||||
|
candidateRcp = nil
|
||||||
|
}
|
||||||
|
if backSearchWait != nil {
|
||||||
|
reverts[val.Val.Key()] = true
|
||||||
|
}
|
||||||
case store.HCApply:
|
case store.HCApply:
|
||||||
|
if candidateTs != nil && val.Val.Height() >= candidateTs.Height() + abi.ChainEpoch(confidence) {
|
||||||
|
return candidateTs, candidateRcp, nil
|
||||||
|
}
|
||||||
r, err := sm.tipsetExecutedMessage(val.Val, mcid, msg.VMMessage())
|
r, err := sm.tipsetExecutedMessage(val.Val, mcid, msg.VMMessage())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if r != nil {
|
if r != nil {
|
||||||
return val.Val, r, nil
|
if confidence == 0 {
|
||||||
|
return val.Val, r, err
|
||||||
|
}
|
||||||
|
candidateTs = val.Val
|
||||||
|
candidateRcp = r
|
||||||
|
}
|
||||||
|
heightOfHead = val.Val.Height()
|
||||||
|
|
||||||
|
// check for timeout
|
||||||
|
if heightOfHead >= head[0].Val.Height() + abi.ChainEpoch(timeout) {
|
||||||
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case <-backSearchWait:
|
case <-backSearchWait:
|
||||||
if backTs != nil {
|
// check if we found the message in the chain and that is hasn't been reverted since we started searching
|
||||||
|
if backTs != nil && !reverts[backTs.Key()] {
|
||||||
|
// if head is at or past confidence interval, return immediately
|
||||||
|
if heightOfHead >= backTs.Height() + abi.ChainEpoch(confidence) {
|
||||||
return backTs, backRcp, nil
|
return backTs, backRcp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait for confidence interval
|
||||||
|
candidateTs = backTs
|
||||||
|
candidateRcp = backRcp
|
||||||
|
}
|
||||||
|
reverts = nil
|
||||||
backSearchWait = nil
|
backSearchWait = nil
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, nil, ctx.Err()
|
return nil, nil, ctx.Err()
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -117,7 +118,7 @@ var msigCreateCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wait for it to get mined into a block
|
// wait for it to get mined into a block
|
||||||
wait, err := api.StateWaitMsg(ctx, msgCid)
|
wait, err := api.StateWaitMsg(ctx, msgCid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -333,7 +334,7 @@ var msigProposeCmd = &cli.Command{
|
|||||||
|
|
||||||
fmt.Println("send proposal in message: ", msgCid)
|
fmt.Println("send proposal in message: ", msgCid)
|
||||||
|
|
||||||
wait, err := api.StateWaitMsg(ctx, msgCid)
|
wait, err := api.StateWaitMsg(ctx, msgCid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -449,7 +450,7 @@ var msigApproveCmd = &cli.Command{
|
|||||||
|
|
||||||
fmt.Println("sent approval in message: ", msgCid)
|
fmt.Println("sent approval in message: ", msgCid)
|
||||||
|
|
||||||
wait, err := api.StateWaitMsg(ctx, msgCid)
|
wait, err := api.StateWaitMsg(ctx, msgCid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
|
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
|
||||||
@ -361,7 +362,7 @@ var paychVoucherSubmitCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mwait, err := api.StateWaitMsg(ctx, mcid)
|
mwait, err := api.StateWaitMsg(ctx, mcid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -392,7 +393,7 @@ var stateReplaySetCmd = &cli.Command{
|
|||||||
|
|
||||||
ts, err = types.NewTipSet(headers)
|
ts, err = types.NewTipSet(headers)
|
||||||
} else {
|
} else {
|
||||||
r, err := api.StateWaitMsg(ctx, mcid)
|
r, err := api.StateWaitMsg(ctx, mcid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding message in chain: %w", err)
|
return xerrors.Errorf("finding message in chain: %w", err)
|
||||||
}
|
}
|
||||||
@ -1156,7 +1157,7 @@ var stateWaitMsgCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mw, err := api.StateWaitMsg(ctx, msg)
|
mw, err := api.StateWaitMsg(ctx, msg, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -334,7 +334,7 @@ func (h *handler) msgwait(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
mw, err := h.api.StateWaitMsg(r.Context(), c)
|
mw, err := h.api.StateWaitMsg(r.Context(), c, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(400)
|
w.WriteHeader(400)
|
||||||
w.Write([]byte(err.Error()))
|
w.Write([]byte(err.Error()))
|
||||||
@ -357,7 +357,7 @@ func (h *handler) msgwaitaddr(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
mw, err := h.api.StateWaitMsg(r.Context(), c)
|
mw, err := h.api.StateWaitMsg(r.Context(), c, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(400)
|
w.WriteHeader(400)
|
||||||
w.Write([]byte(err.Error()))
|
w.Write([]byte(err.Error()))
|
||||||
|
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"gopkg.in/urfave/cli.v2"
|
"gopkg.in/urfave/cli.v2"
|
||||||
@ -85,7 +86,7 @@ var verifRegAddVerifierCmd = &cli.Command{
|
|||||||
|
|
||||||
fmt.Printf("message sent, now waiting on cid: %s\n", smsg.Cid())
|
fmt.Printf("message sent, now waiting on cid: %s\n", smsg.Cid())
|
||||||
|
|
||||||
mwait, err := api.StateWaitMsg(ctx, smsg.Cid())
|
mwait, err := api.StateWaitMsg(ctx, smsg.Cid(), build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -161,7 +162,7 @@ var verifRegVerifyClientCmd = &cli.Command{
|
|||||||
|
|
||||||
fmt.Printf("message sent, now waiting on cid: %s\n", smsg.Cid())
|
fmt.Printf("message sent, now waiting on cid: %s\n", smsg.Cid())
|
||||||
|
|
||||||
mwait, err := api.StateWaitMsg(ctx, smsg.Cid())
|
mwait, err := api.StateWaitMsg(ctx, smsg.Cid(), build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -577,7 +577,7 @@ func configureStorageMiner(ctx context.Context, api lapi.FullNode, addr address.
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Waiting for message: ", smsg.Cid())
|
log.Info("Waiting for message: ", smsg.Cid())
|
||||||
ret, err := api.StateWaitMsg(ctx, smsg.Cid())
|
ret, err := api.StateWaitMsg(ctx, smsg.Cid(), build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -659,7 +659,7 @@ func createStorageMiner(ctx context.Context, api lapi.FullNode, peerid peer.ID,
|
|||||||
log.Infof("Pushed StorageMarket.CreateStorageMiner, %s to Mpool", signed.Cid())
|
log.Infof("Pushed StorageMarket.CreateStorageMiner, %s to Mpool", signed.Cid())
|
||||||
log.Infof("Waiting for confirmation")
|
log.Infof("Waiting for confirmation")
|
||||||
|
|
||||||
mw, err := api.StateWaitMsg(ctx, signed.Cid())
|
mw, err := api.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return address.Undef, err
|
return address.Undef, err
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package retrievaladapter
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||||
@ -72,7 +73,7 @@ func (rcn *retrievalClientNode) GetChainHead(ctx context.Context) (shared.TipSet
|
|||||||
// WaitForPaymentChannelAddFunds waits messageCID to appear on chain. If it doesn't appear within
|
// WaitForPaymentChannelAddFunds waits messageCID to appear on chain. If it doesn't appear within
|
||||||
// defaultMsgWaitTimeout it returns error
|
// defaultMsgWaitTimeout it returns error
|
||||||
func (rcn *retrievalClientNode) WaitForPaymentChannelAddFunds(messageCID cid.Cid) error {
|
func (rcn *retrievalClientNode) WaitForPaymentChannelAddFunds(messageCID cid.Cid) error {
|
||||||
_, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID)
|
_, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID, build.MessageConfidence, build.MessageTimeout)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -84,7 +85,7 @@ func (rcn *retrievalClientNode) WaitForPaymentChannelAddFunds(messageCID cid.Cid
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rcn *retrievalClientNode) WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) {
|
func (rcn *retrievalClientNode) WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) {
|
||||||
_, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID)
|
_, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID, build.MessageConfidence, build.MessageTimeout)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return address.Undef, err
|
return address.Undef, err
|
||||||
|
@ -211,7 +211,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: timeout
|
// TODO: timeout
|
||||||
_, ret, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage)
|
_, ret, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, xerrors.Errorf("waiting for deal publish message: %w", err)
|
return 0, xerrors.Errorf("waiting for deal publish message: %w", err)
|
||||||
}
|
}
|
||||||
@ -397,7 +397,7 @@ func (n *ClientNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetToke
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *ClientNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid, cb func(code exitcode.ExitCode, bytes []byte, err error) error) error {
|
func (n *ClientNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid, cb func(code exitcode.ExitCode, bytes []byte, err error) error) error {
|
||||||
receipt, err := n.StateWaitMsg(ctx, mcid)
|
receipt, err := n.StateWaitMsg(ctx, mcid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cb(0, nil, err)
|
return cb(0, nil, err)
|
||||||
}
|
}
|
||||||
|
@ -338,7 +338,7 @@ func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetTo
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid, cb func(code exitcode.ExitCode, bytes []byte, err error) error) error {
|
func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid, cb func(code exitcode.ExitCode, bytes []byte, err error) error) error {
|
||||||
receipt, err := n.StateWaitMsg(ctx, mcid)
|
receipt, err := n.StateWaitMsg(ctx, mcid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cb(0, nil, err)
|
return cb(0, nil, err)
|
||||||
}
|
}
|
||||||
|
@ -344,10 +344,8 @@ func (a *StateAPI) MinerCreateBlock(ctx context.Context, bt *api.BlockTemplate)
|
|||||||
return &out, nil
|
return &out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error) {
|
func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64, timeout uint64) (*api.MsgLookup, error) {
|
||||||
// TODO: consider using event system for this, expose confidence
|
ts, recpt, err := a.StateManager.WaitForMessage(ctx, msg, confidence, timeout)
|
||||||
|
|
||||||
ts, recpt, err := a.StateManager.WaitForMessage(ctx, msg)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package paychmgr
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
init_ "github.com/filecoin-project/specs-actors/actors/builtin/init"
|
init_ "github.com/filecoin-project/specs-actors/actors/builtin/init"
|
||||||
@ -55,7 +56,7 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am
|
|||||||
// (tricky because we need to setup channel tracking before we know its address)
|
// (tricky because we need to setup channel tracking before we know its address)
|
||||||
func (pm *Manager) waitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) {
|
func (pm *Manager) waitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) {
|
||||||
defer pm.store.lk.Unlock()
|
defer pm.store.lk.Unlock()
|
||||||
mwait, err := pm.state.StateWaitMsg(ctx, mcid)
|
mwait, err := pm.state.StateWaitMsg(ctx, mcid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("wait msg: %w", err)
|
log.Errorf("wait msg: %w", err)
|
||||||
}
|
}
|
||||||
@ -105,7 +106,7 @@ func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from addres
|
|||||||
// (tricky because we need to setup channel tracking before we know it's address)
|
// (tricky because we need to setup channel tracking before we know it's address)
|
||||||
func (pm *Manager) waitForAddFundsMsg(ctx context.Context, mcid cid.Cid) {
|
func (pm *Manager) waitForAddFundsMsg(ctx context.Context, mcid cid.Cid) {
|
||||||
defer pm.store.lk.Unlock()
|
defer pm.store.lk.Unlock()
|
||||||
mwait, err := pm.state.StateWaitMsg(ctx, mcid)
|
mwait, err := pm.state.StateWaitMsg(ctx, mcid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package storage
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
@ -81,7 +82,7 @@ func (s SealingAPIAdapter) StateMinerDeadlines(ctx context.Context, maddr addres
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (sealing.MsgLookup, error) {
|
func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (sealing.MsgLookup, error) {
|
||||||
wmsg, err := s.delegate.StateWaitMsg(ctx, mcid)
|
wmsg, err := s.delegate.StateWaitMsg(ctx, mcid, build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sealing.MsgLookup{}, err
|
return sealing.MsgLookup{}, err
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ type storageMinerApi interface {
|
|||||||
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
|
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
|
||||||
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error)
|
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error)
|
||||||
StateMinerInitialPledgeCollateral(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (types.BigInt, error)
|
StateMinerInitialPledgeCollateral(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (types.BigInt, error)
|
||||||
StateWaitMsg(context.Context, cid.Cid) (*api.MsgLookup, error) // TODO: removeme eventually
|
StateWaitMsg(context.Context, cid.Cid, uint64, uint64) (*api.MsgLookup, error) // TODO: removeme eventually
|
||||||
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
|
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
|
||||||
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
||||||
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
|
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
|
||||||
|
@ -167,7 +167,7 @@ func (s *WindowPoStScheduler) checkRecoveries(ctx context.Context, deadline uint
|
|||||||
|
|
||||||
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
|
log.Warnw("declare faults recovered Message CID", "cid", sm.Cid())
|
||||||
|
|
||||||
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid())
|
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("declare faults recovered wait error: %w", err)
|
return xerrors.Errorf("declare faults recovered wait error: %w", err)
|
||||||
}
|
}
|
||||||
@ -406,7 +406,7 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
|
|||||||
log.Infof("Submitted window post: %s", sm.Cid())
|
log.Infof("Submitted window post: %s", sm.Cid())
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid())
|
rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, build.MessageTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user