package full import ( "context" "sync" "github.com/ipfs/go-cid" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) type MpoolAPI struct { fx.In WalletAPI GasAPI Chain *store.ChainStore Mpool *messagepool.MessagePool PushLocks struct { m map[address.Address]chan struct{} sync.Mutex } `name:"verymuchunique" optional:"true"` } func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) { return a.Mpool.GetConfig(), nil } func (a *MpoolAPI) MpoolSetConfig(ctx context.Context, cfg *types.MpoolConfig) error { a.Mpool.SetConfig(cfg) return nil } func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { ts, err := a.Chain.GetTipSetFromKey(tsk) if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } return a.Mpool.SelectMessages(ts) } func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { ts, err := a.Chain.GetTipSetFromKey(tsk) if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } pending, mpts := a.Mpool.Pending() haveCids := map[cid.Cid]struct{}{} for _, m := range pending { haveCids[m.Cid()] = struct{}{} } if ts == nil || mpts.Height() > ts.Height() { return pending, nil } for { if mpts.Height() == ts.Height() { if mpts.Equals(ts) { return pending, nil } // different blocks in tipsets have, err := a.Mpool.MessagesForBlocks(ts.Blocks()) if err != nil { return nil, xerrors.Errorf("getting messages for base ts: %w", err) } for _, m := range have { haveCids[m.Cid()] = struct{}{} } } msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks()) if err != nil { return nil, xerrors.Errorf(": %w", err) } for _, m := range msgs { if _, ok := haveCids[m.Cid()]; ok { continue } haveCids[m.Cid()] = struct{}{} pending = append(pending, m) } if mpts.Height() >= ts.Height() { return pending, nil } ts, err = a.Chain.LoadTipSet(ts.Parents()) if err != nil { return nil, xerrors.Errorf("loading parent tipset: %w", err) } } } func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { return a.Mpool.Push(smsg) } func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) { { fromA, err := a.Stmgr.ResolveToKeyAddress(ctx, msg.From, nil) if err != nil { return nil, xerrors.Errorf("getting key address: %w", err) } a.PushLocks.Lock() if a.PushLocks.m == nil { a.PushLocks.m = make(map[address.Address]chan struct{}) } lk, ok := a.PushLocks.m[fromA] if !ok { lk = make(chan struct{}, 1) a.PushLocks.m[msg.From] = lk } a.PushLocks.Unlock() select { case lk <- struct{}{}: case <-ctx.Done(): return nil, ctx.Err() } defer func() { <-lk }() } if msg.Nonce != 0 { return nil, xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce) } if msg.GasLimit == 0 { gasLimit, err := a.GasEstimateGasLimit(ctx, msg, types.TipSetKey{}) if err != nil { return nil, xerrors.Errorf("estimating gas used: %w", err) } msg.GasLimit = int64(float64(gasLimit) * a.Mpool.GetConfig().GasLimitOverestimation) } if msg.GasPremium == types.EmptyInt || types.BigCmp(msg.GasPremium, types.NewInt(0)) == 0 { gasPremium, err := a.GasEsitmateGasPremium(ctx, 2, msg.From, msg.GasLimit, types.TipSetKey{}) if err != nil { return nil, xerrors.Errorf("estimating gas price: %w", err) } msg.GasPremium = gasPremium } if msg.GasFeeCap == types.EmptyInt || types.BigCmp(msg.GasFeeCap, types.NewInt(0)) == 0 { feeCap, err := a.GasEstimateFeeCap(ctx, msg, 10, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("estimating fee cap: %w", err) } msg.GasFeeCap = feeCap } return a.Mpool.PushWithNonce(ctx, msg.From, func(from address.Address, nonce uint64) (*types.SignedMessage, error) { msg.Nonce = nonce if msg.From.Protocol() == address.ID { log.Warnf("Push from ID address (%s), adjusting to %s", msg.From, from) msg.From = from } b, err := a.WalletBalance(ctx, msg.From) if err != nil { return nil, xerrors.Errorf("mpool push: getting origin balance: %w", err) } if b.LessThan(msg.Value) { return nil, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, msg.Value) } return a.WalletSignMessage(ctx, from, msg) }) } func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { return a.Mpool.GetNonce(addr) } func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error) { return a.Mpool.Updates(ctx) }