lotus/node/impl/full/mpool.go

191 lines
4.8 KiB
Go
Raw Normal View History

2019-08-20 16:48:33 +00:00
package full
import (
"context"
2020-08-12 17:06:08 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
2019-10-02 18:00:08 +00:00
2019-12-03 19:33:29 +00:00
"github.com/ipfs/go-cid"
2019-08-20 16:48:33 +00:00
"go.uber.org/fx"
2019-09-16 14:17:08 +00:00
"golang.org/x/xerrors"
2019-08-20 16:48:33 +00:00
"github.com/filecoin-project/go-address"
2019-11-17 07:44:06 +00:00
"github.com/filecoin-project/lotus/api"
2019-12-01 23:11:43 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
2019-12-03 19:33:29 +00:00
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
2019-08-20 16:48:33 +00:00
)
type MpoolAPI struct {
fx.In
2019-09-16 14:17:08 +00:00
WalletAPI
GasAPI
2019-09-16 14:17:08 +00:00
2019-12-03 19:33:29 +00:00
Chain *store.ChainStore
2019-12-01 23:11:43 +00:00
Mpool *messagepool.MessagePool
PushLocks *dtypes.MpoolLocker
2019-08-20 16:48:33 +00:00
}
2020-08-07 14:45:31 +00:00
func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) {
return a.Mpool.GetConfig(), nil
}
func (a *MpoolAPI) MpoolSetConfig(ctx context.Context, cfg *types.MpoolConfig) error {
return a.Mpool.SetConfig(cfg)
2020-08-07 14:45:31 +00:00
}
func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQuality float64) ([]*types.SignedMessage, error) {
2020-08-05 20:17:14 +00:00
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return a.Mpool.SelectMessages(ts, ticketQuality)
2020-08-05 20:17:14 +00:00
}
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
2019-12-03 19:33:29 +00:00
pending, mpts := a.Mpool.Pending()
haveCids := map[cid.Cid]struct{}{}
for _, m := range pending {
haveCids[m.Cid()] = struct{}{}
}
2019-12-03 19:34:31 +00:00
if ts == nil || mpts.Height() > ts.Height() {
2019-12-03 19:33:29 +00:00
return pending, nil
}
for {
if mpts.Height() == ts.Height() {
if mpts.Equals(ts) {
return pending, nil
}
// different blocks in tipsets
have, err := a.Mpool.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf("getting messages for base ts: %w", err)
}
for _, m := range have {
haveCids[m.Cid()] = struct{}{}
}
}
msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
for _, m := range msgs {
if _, ok := haveCids[m.Cid()]; ok {
continue
}
haveCids[m.Cid()] = struct{}{}
pending = append(pending, m)
}
if mpts.Height() >= ts.Height() {
return pending, nil
}
ts, err = a.Chain.LoadTipSet(ts.Parents())
if err != nil {
return nil, xerrors.Errorf("loading parent tipset: %w", err)
}
}
2019-08-20 16:48:33 +00:00
}
2020-01-07 16:44:55 +00:00
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
2019-09-16 14:17:08 +00:00
return a.Mpool.Push(smsg)
}
2020-08-12 17:06:08 +00:00
func capGasFee(msg *types.Message, maxFee abi.TokenAmount) {
if maxFee.Equals(big.Zero()) {
return
}
2020-08-13 12:16:29 +00:00
gl := types.NewInt(uint64(msg.GasLimit))
2020-08-12 20:01:31 +00:00
totalFee := types.BigMul(msg.GasFeeCap, gl)
minerFee := types.BigMul(msg.GasPremium, gl)
2020-08-12 17:06:08 +00:00
2020-08-12 20:01:31 +00:00
if totalFee.LessThanEqual(maxFee) {
2020-08-12 17:06:08 +00:00
return
}
// scale chain/miner fee down proportionally to fit in our budget
// TODO: there are probably smarter things we can do here to optimize
// message inclusion latency
2020-08-12 20:01:31 +00:00
msg.GasFeeCap = big.Div(maxFee, gl)
msg.GasPremium = big.Div(big.Div(big.Mul(minerFee, maxFee), totalFee), gl)
2020-08-12 17:06:08 +00:00
}
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
{
fromA, err := a.Stmgr.ResolveToKeyAddress(ctx, msg.From, nil)
if err != nil {
return nil, xerrors.Errorf("getting key address: %w", err)
}
done, err := a.PushLocks.TakeLock(ctx, fromA)
if err != nil {
return nil, xerrors.Errorf("taking lock: %w", err)
}
defer done()
}
2019-09-16 14:17:08 +00:00
if msg.Nonce != 0 {
2019-09-17 08:15:26 +00:00
return nil, xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce)
2019-08-20 16:48:33 +00:00
}
2020-08-19 21:25:58 +00:00
msg, err := a.GasAPI.GasEstimateMessageGas(ctx, msg, spec, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("GasEstimateMessageGas error: %w", err)
}
2019-08-20 16:48:33 +00:00
sign := func(from address.Address, nonce uint64) (*types.SignedMessage, error) {
2019-09-16 14:17:08 +00:00
msg.Nonce = nonce
2020-04-17 16:47:14 +00:00
if msg.From.Protocol() == address.ID {
log.Warnf("Push from ID address (%s), adjusting to %s", msg.From, from)
msg.From = from
}
b, err := a.WalletBalance(ctx, msg.From)
if err != nil {
return nil, xerrors.Errorf("mpool push: getting origin balance: %w", err)
}
if b.LessThan(msg.Value) {
return nil, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, msg.Value)
}
return a.WalletSignMessage(ctx, from, msg)
}
var m *types.SignedMessage
again:
m, err = a.Mpool.PushWithNonce(ctx, msg.From, sign)
if err == messagepool.ErrTryAgain {
log.Debugf("temporary failure while pushing message: %s; retrying", err)
goto again
}
return m, err
2019-08-20 16:48:33 +00:00
}
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return a.Mpool.GetNonce(addr)
}
2019-11-17 07:44:06 +00:00
func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error) {
return a.Mpool.Updates(ctx)
}