Merge pull request #3915 from filecoin-project/feat/mpool-gateway-api
MpoolPushUntrusted API for gateway
This commit is contained in:
commit
a3a3df9b30
@ -192,6 +192,9 @@ type FullNode interface {
|
|||||||
// MpoolPush pushes a signed message to mempool.
|
// MpoolPush pushes a signed message to mempool.
|
||||||
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
|
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
|
||||||
|
|
||||||
|
// MpoolPushUntrusted pushes a signed message to mempool from untrusted sources.
|
||||||
|
MpoolPushUntrusted(context.Context, *types.SignedMessage) (cid.Cid, error)
|
||||||
|
|
||||||
// MpoolPushMessage atomically assigns a nonce, signs, and pushes a message
|
// MpoolPushMessage atomically assigns a nonce, signs, and pushes a message
|
||||||
// to mempool.
|
// to mempool.
|
||||||
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified
|
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified
|
||||||
|
@ -122,7 +122,9 @@ type FullNodeStruct struct {
|
|||||||
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
|
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
|
||||||
MpoolClear func(context.Context, bool) error `perm:"write"`
|
MpoolClear func(context.Context, bool) error `perm:"write"`
|
||||||
|
|
||||||
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
|
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
|
||||||
|
MpoolPushUntrusted func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
|
||||||
|
|
||||||
MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"`
|
MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"`
|
||||||
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
|
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
|
||||||
MpoolSub func(context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"`
|
MpoolSub func(context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"`
|
||||||
@ -553,6 +555,10 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag
|
|||||||
return c.Internal.MpoolPush(ctx, smsg)
|
return c.Internal.MpoolPush(ctx, smsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *FullNodeStruct) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
||||||
|
return c.Internal.MpoolPushUntrusted(ctx, smsg)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
||||||
return c.Internal.MpoolPushMessage(ctx, msg, spec)
|
return c.Internal.MpoolPushMessage(ctx, msg, spec)
|
||||||
}
|
}
|
||||||
|
@ -55,6 +55,7 @@ var baseFeeLowerBoundFactor = types.NewInt(10)
|
|||||||
var baseFeeLowerBoundFactorConservative = types.NewInt(100)
|
var baseFeeLowerBoundFactorConservative = types.NewInt(100)
|
||||||
|
|
||||||
var MaxActorPendingMessages = 1000
|
var MaxActorPendingMessages = 1000
|
||||||
|
var MaxUntrustedActorPendingMessages = 10
|
||||||
|
|
||||||
var MaxNonceGap = uint64(4)
|
var MaxNonceGap = uint64(4)
|
||||||
|
|
||||||
@ -195,9 +196,17 @@ func CapGasFee(msg *types.Message, maxFee abi.TokenAmount) {
|
|||||||
msg.GasPremium = big.Min(msg.GasFeeCap, msg.GasPremium) // cap premium at FeeCap
|
msg.GasPremium = big.Min(msg.GasFeeCap, msg.GasPremium) // cap premium at FeeCap
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (bool, error) {
|
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict, untrusted bool) (bool, error) {
|
||||||
nextNonce := ms.nextNonce
|
nextNonce := ms.nextNonce
|
||||||
nonceGap := false
|
nonceGap := false
|
||||||
|
|
||||||
|
maxNonceGap := MaxNonceGap
|
||||||
|
maxActorPendingMessages := MaxActorPendingMessages
|
||||||
|
if untrusted {
|
||||||
|
maxNonceGap = 0
|
||||||
|
maxActorPendingMessages = MaxUntrustedActorPendingMessages
|
||||||
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case m.Message.Nonce == nextNonce:
|
case m.Message.Nonce == nextNonce:
|
||||||
nextNonce++
|
nextNonce++
|
||||||
@ -206,7 +215,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo
|
|||||||
nextNonce++
|
nextNonce++
|
||||||
}
|
}
|
||||||
|
|
||||||
case strict && m.Message.Nonce > nextNonce+MaxNonceGap:
|
case strict && m.Message.Nonce > nextNonce+maxNonceGap:
|
||||||
return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap)
|
return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap)
|
||||||
|
|
||||||
case m.Message.Nonce > nextNonce:
|
case m.Message.Nonce > nextNonce:
|
||||||
@ -242,7 +251,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo
|
|||||||
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
|
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !has && strict && len(ms.msgs) > MaxActorPendingMessages {
|
if !has && strict && len(ms.msgs) >= maxActorPendingMessages {
|
||||||
log.Errorf("too many pending messages from actor %s", m.Message.From)
|
log.Errorf("too many pending messages from actor %s", m.Message.From)
|
||||||
return false, ErrTooManyPendingMessages
|
return false, ErrTooManyPendingMessages
|
||||||
}
|
}
|
||||||
@ -484,7 +493,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
publish, err := mp.addTs(m, mp.curTs, true)
|
publish, err := mp.addTs(m, mp.curTs, true, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
@ -551,7 +560,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
|||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
_, err = mp.addTs(m, mp.curTs, false)
|
_, err = mp.addTs(m, mp.curTs, false, false)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -619,7 +628,7 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) {
|
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
|
||||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
||||||
@ -641,7 +650,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return publish, mp.addLocked(m, !local)
|
return publish, mp.addLocked(m, !local, untrusted)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
||||||
@ -676,17 +685,17 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return mp.addLocked(m, false)
|
return mp.addLocked(m, false, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
|
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.addLocked(m, false)
|
return mp.addLocked(m, false, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
|
func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) error {
|
||||||
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
|
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
|
||||||
if m.Signature.Type == crypto.SigTypeBLS {
|
if m.Signature.Type == crypto.SigTypeBLS {
|
||||||
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
||||||
@ -713,7 +722,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
|
|||||||
mp.pending[m.Message.From] = mset
|
mp.pending[m.Message.From] = mset
|
||||||
}
|
}
|
||||||
|
|
||||||
incr, err := mset.add(m, mp, strict)
|
incr, err := mset.add(m, mp, strict, untrusted)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug(err)
|
log.Debug(err)
|
||||||
return err
|
return err
|
||||||
@ -793,6 +802,50 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (
|
|||||||
return act.Balance, nil
|
return act.Balance, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this method is provided for the gateway to push messages.
|
||||||
|
// differences from Push:
|
||||||
|
// - strict checks are enabled
|
||||||
|
// - extra strict add checks are used when adding the messages to the msgSet
|
||||||
|
// that means: no nonce gaps, at most 10 pending messages for the actor
|
||||||
|
func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
|
||||||
|
err := mp.checkMessage(m)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// serialize push access to reduce lock contention
|
||||||
|
mp.addSema <- struct{}{}
|
||||||
|
defer func() {
|
||||||
|
<-mp.addSema
|
||||||
|
}()
|
||||||
|
|
||||||
|
msgb, err := m.Serialize()
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
publish, err := mp.addTs(m, mp.curTs, false, true)
|
||||||
|
if err != nil {
|
||||||
|
mp.curTsLk.Unlock()
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
mp.curTsLk.Unlock()
|
||||||
|
|
||||||
|
mp.lk.Lock()
|
||||||
|
if err := mp.addLocal(m, msgb); err != nil {
|
||||||
|
mp.lk.Unlock()
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
mp.lk.Unlock()
|
||||||
|
|
||||||
|
if publish {
|
||||||
|
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.Cid(), err
|
||||||
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
|
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
@ -86,5 +86,5 @@ func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (ci
|
|||||||
|
|
||||||
// TODO: additional anti-spam checks
|
// TODO: additional anti-spam checks
|
||||||
|
|
||||||
return a.api.MpoolPush(ctx, sm)
|
return a.api.MpoolPushUntrusted(ctx, sm)
|
||||||
}
|
}
|
||||||
|
@ -72,6 +72,7 @@
|
|||||||
* [MpoolPending](#MpoolPending)
|
* [MpoolPending](#MpoolPending)
|
||||||
* [MpoolPush](#MpoolPush)
|
* [MpoolPush](#MpoolPush)
|
||||||
* [MpoolPushMessage](#MpoolPushMessage)
|
* [MpoolPushMessage](#MpoolPushMessage)
|
||||||
|
* [MpoolPushUntrusted](#MpoolPushUntrusted)
|
||||||
* [MpoolSelect](#MpoolSelect)
|
* [MpoolSelect](#MpoolSelect)
|
||||||
* [MpoolSetConfig](#MpoolSetConfig)
|
* [MpoolSetConfig](#MpoolSetConfig)
|
||||||
* [MpoolSub](#MpoolSub)
|
* [MpoolSub](#MpoolSub)
|
||||||
@ -1779,6 +1780,43 @@ Response:
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### MpoolPushUntrusted
|
||||||
|
MpoolPushUntrusted pushes a signed message to mempool from untrusted sources.
|
||||||
|
|
||||||
|
|
||||||
|
Perms: write
|
||||||
|
|
||||||
|
Inputs:
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"Message": {
|
||||||
|
"Version": 42,
|
||||||
|
"To": "t01234",
|
||||||
|
"From": "t01234",
|
||||||
|
"Nonce": 42,
|
||||||
|
"Value": "0",
|
||||||
|
"GasLimit": 9,
|
||||||
|
"GasFeeCap": "0",
|
||||||
|
"GasPremium": "0",
|
||||||
|
"Method": 1,
|
||||||
|
"Params": "Ynl0ZSBhcnJheQ=="
|
||||||
|
},
|
||||||
|
"Signature": {
|
||||||
|
"Type": 2,
|
||||||
|
"Data": "Ynl0ZSBhcnJheQ=="
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
Response:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### MpoolSelect
|
### MpoolSelect
|
||||||
MpoolSelect returns a list of pending messages for inclusion in the next block
|
MpoolSelect returns a list of pending messages for inclusion in the next block
|
||||||
|
|
||||||
|
@ -110,6 +110,10 @@ func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (ci
|
|||||||
return a.Mpool.Push(smsg)
|
return a.Mpool.Push(smsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
||||||
|
return a.Mpool.PushUntrusted(smsg)
|
||||||
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
||||||
cp := *msg
|
cp := *msg
|
||||||
msg = &cp
|
msg = &cp
|
||||||
|
Loading…
Reference in New Issue
Block a user