lotus/node/impl/full/mpool.go
Steven Allen d192b821a9
fix: gas: estimate gas with a zero base-fee
Otherwise, an account will need funds to estimate the max possible gas a
message could take (which is usually the block gas limit).

This does mean gas estimation no longer checks if the sending account
has enough funds to cover the message cost, but MpoolPush will now do
this.
2022-07-08 09:47:45 -07:00

251 lines
6.8 KiB
Go

package full
import (
"context"
"encoding/json"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/messagesigner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
type MpoolModuleAPI interface {
MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error)
}
var _ MpoolModuleAPI = *new(api.FullNode)
// MpoolModule provides a default implementation of MpoolModuleAPI.
// It can be swapped out with another implementation through Dependency
// Injection (for example with a thin RPC client).
type MpoolModule struct {
fx.In
Mpool *messagepool.MessagePool
}
var _ MpoolModuleAPI = (*MpoolModule)(nil)
type MpoolAPI struct {
fx.In
MpoolModuleAPI
WalletAPI
GasAPI
MessageSigner *messagesigner.MessageSigner
PushLocks *dtypes.MpoolLocker
}
func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) {
return a.Mpool.GetConfig(), nil
}
func (a *MpoolAPI) MpoolSetConfig(ctx context.Context, cfg *types.MpoolConfig) error {
return a.Mpool.SetConfig(ctx, cfg)
}
func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQuality float64) ([]*types.SignedMessage, error) {
ts, err := a.Chain.GetTipSetFromKey(ctx, tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return a.Mpool.SelectMessages(ctx, ts, ticketQuality)
}
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
ts, err := a.Chain.GetTipSetFromKey(ctx, tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
pending, mpts := a.Mpool.Pending(ctx)
haveCids := map[cid.Cid]struct{}{}
for _, m := range pending {
haveCids[m.Cid()] = struct{}{}
}
if ts == nil || mpts.Height() > ts.Height() {
return pending, nil
}
for {
if mpts.Height() == ts.Height() {
if mpts.Equals(ts) {
return pending, nil
}
// different blocks in tipsets of the same height
// we exclude messages that have been included in blocks in the mpool tipset
have, err := a.Mpool.MessagesForBlocks(ctx, mpts.Blocks())
if err != nil {
return nil, xerrors.Errorf("getting messages for base ts: %w", err)
}
for _, m := range have {
haveCids[m.Cid()] = struct{}{}
}
}
msgs, err := a.Mpool.MessagesForBlocks(ctx, ts.Blocks())
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
for _, m := range msgs {
if _, ok := haveCids[m.Cid()]; ok {
continue
}
haveCids[m.Cid()] = struct{}{}
pending = append(pending, m)
}
if mpts.Height() >= ts.Height() {
return pending, nil
}
ts, err = a.Chain.LoadTipSet(ctx, ts.Parents())
if err != nil {
return nil, xerrors.Errorf("loading parent tipset: %w", err)
}
}
}
func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error {
a.Mpool.Clear(ctx, local)
return nil
}
func (m *MpoolModule) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return m.Mpool.Push(ctx, smsg)
}
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return a.Mpool.PushUntrusted(ctx, smsg)
}
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
cp := *msg
msg = &cp
inMsg := *msg
fromA, err := a.Stmgr.ResolveToKeyAddress(ctx, msg.From, nil)
if err != nil {
return nil, xerrors.Errorf("getting key address: %w", err)
}
{
done, err := a.PushLocks.TakeLock(ctx, fromA)
if err != nil {
return nil, xerrors.Errorf("taking lock: %w", err)
}
defer done()
}
if msg.Nonce != 0 {
return nil, xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce)
}
msg, err = a.GasAPI.GasEstimateMessageGas(ctx, msg, spec, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("GasEstimateMessageGas error: %w", err)
}
if msg.GasPremium.GreaterThan(msg.GasFeeCap) {
inJson, _ := json.Marshal(inMsg)
outJson, _ := json.Marshal(msg)
return nil, xerrors.Errorf("After estimation, GasPremium is greater than GasFeeCap, inmsg: %s, outmsg: %s",
inJson, outJson)
}
if msg.From.Protocol() == address.ID {
log.Warnf("Push from ID address (%s), adjusting to %s", msg.From, fromA)
msg.From = fromA
}
b, err := a.WalletBalance(ctx, msg.From)
if err != nil {
return nil, xerrors.Errorf("mpool push: getting origin balance: %w", err)
}
requiredFunds := big.Add(msg.Value, msg.RequiredFunds())
if b.LessThan(requiredFunds) {
return nil, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds)
}
// Sign and push the message
return a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error {
if _, err := a.MpoolModuleAPI.MpoolPush(ctx, smsg); err != nil {
return xerrors.Errorf("mpool push: failed to push message: %w", err)
}
return nil
})
}
func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
var messageCids []cid.Cid
for _, smsg := range smsgs {
smsgCid, err := a.Mpool.Push(ctx, smsg)
if err != nil {
return messageCids, err
}
messageCids = append(messageCids, smsgCid)
}
return messageCids, nil
}
func (a *MpoolAPI) MpoolBatchPushUntrusted(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
var messageCids []cid.Cid
for _, smsg := range smsgs {
smsgCid, err := a.Mpool.PushUntrusted(ctx, smsg)
if err != nil {
return messageCids, err
}
messageCids = append(messageCids, smsgCid)
}
return messageCids, nil
}
func (a *MpoolAPI) MpoolBatchPushMessage(ctx context.Context, msgs []*types.Message, spec *api.MessageSendSpec) ([]*types.SignedMessage, error) {
var smsgs []*types.SignedMessage
for _, msg := range msgs {
smsg, err := a.MpoolPushMessage(ctx, msg, spec)
if err != nil {
return smsgs, err
}
smsgs = append(smsgs, smsg)
}
return smsgs, nil
}
func (a *MpoolAPI) MpoolCheckMessages(ctx context.Context, protos []*api.MessagePrototype) ([][]api.MessageCheckStatus, error) {
return a.Mpool.CheckMessages(ctx, protos)
}
func (a *MpoolAPI) MpoolCheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
return a.Mpool.CheckPendingMessages(ctx, from)
}
func (a *MpoolAPI) MpoolCheckReplaceMessages(ctx context.Context, msgs []*types.Message) ([][]api.MessageCheckStatus, error) {
return a.Mpool.CheckReplaceMessages(ctx, msgs)
}
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return a.Mpool.GetNonce(ctx, addr, types.EmptyTSK)
}
func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error) {
return a.Mpool.Updates(ctx)
}