MpoolPushUntrusted API for gateway

This commit is contained in:
vyzo 2020-09-18 09:40:43 +03:00
parent 4121063ca4
commit 3c72461969
5 changed files with 80 additions and 14 deletions

View File

@ -187,6 +187,9 @@ type FullNode interface {
// MpoolPush pushes a signed message to mempool. // MpoolPush pushes a signed message to mempool.
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error) MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
// MpoolPushUntrusted pushes a signed message to mempool from untrusted sources.
MpoolPushUntrusted(context.Context, *types.SignedMessage) (cid.Cid, error)
// MpoolPushMessage atomically assigns a nonce, signs, and pushes a message // MpoolPushMessage atomically assigns a nonce, signs, and pushes a message
// to mempool. // to mempool.
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified // maxFee is only used when GasFeeCap/GasPremium fields aren't specified

View File

@ -120,7 +120,9 @@ type FullNodeStruct struct {
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolClear func(context.Context, bool) error `perm:"write"` MpoolClear func(context.Context, bool) error `perm:"write"`
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPushUntrusted func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"` MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"` MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
MpoolSub func(context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"` MpoolSub func(context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"`
@ -549,6 +551,10 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag
return c.Internal.MpoolPush(ctx, smsg) return c.Internal.MpoolPush(ctx, smsg)
} }
func (c *FullNodeStruct) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return c.Internal.MpoolPushUntrusted(ctx, smsg)
}
func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
return c.Internal.MpoolPushMessage(ctx, msg, spec) return c.Internal.MpoolPushMessage(ctx, msg, spec)
} }

View File

@ -55,6 +55,7 @@ var baseFeeLowerBoundFactor = types.NewInt(10)
var baseFeeLowerBoundFactorConservative = types.NewInt(100) var baseFeeLowerBoundFactorConservative = types.NewInt(100)
var MaxActorPendingMessages = 1000 var MaxActorPendingMessages = 1000
var MaxUntrustedActorPendingMessages = 10
var MaxNonceGap = uint64(4) var MaxNonceGap = uint64(4)
@ -197,9 +198,17 @@ func CapGasFee(msg *types.Message, maxFee abi.TokenAmount) {
msg.GasPremium = big.Min(msg.GasFeeCap, msg.GasPremium) // cap premium at FeeCap msg.GasPremium = big.Min(msg.GasFeeCap, msg.GasPremium) // cap premium at FeeCap
} }
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (bool, error) { func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict, untrusted bool) (bool, error) {
nextNonce := ms.nextNonce nextNonce := ms.nextNonce
nonceGap := false nonceGap := false
maxNonceGap := MaxNonceGap
maxActorPendingMessages := MaxActorPendingMessages
if untrusted {
maxNonceGap = 0
maxActorPendingMessages = MaxUntrustedActorPendingMessages
}
switch { switch {
case m.Message.Nonce == nextNonce: case m.Message.Nonce == nextNonce:
nextNonce++ nextNonce++
@ -208,7 +217,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo
nextNonce++ nextNonce++
} }
case strict && m.Message.Nonce > nextNonce+MaxNonceGap: case strict && m.Message.Nonce > nextNonce+maxNonceGap:
return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap) return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap)
case m.Message.Nonce > nextNonce: case m.Message.Nonce > nextNonce:
@ -244,7 +253,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int) //ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
} }
if !has && strict && len(ms.msgs) > MaxActorPendingMessages { if !has && strict && len(ms.msgs) > maxActorPendingMessages {
log.Errorf("too many pending messages from actor %s", m.Message.From) log.Errorf("too many pending messages from actor %s", m.Message.From)
return false, ErrTooManyPendingMessages return false, ErrTooManyPendingMessages
} }
@ -486,7 +495,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
} }
mp.curTsLk.Lock() mp.curTsLk.Lock()
publish, err := mp.addTs(m, mp.curTs, true) publish, err := mp.addTs(m, mp.curTs, true, false)
if err != nil { if err != nil {
mp.curTsLk.Unlock() mp.curTsLk.Unlock()
return cid.Undef, err return cid.Undef, err
@ -553,7 +562,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
mp.curTsLk.Lock() mp.curTsLk.Lock()
defer mp.curTsLk.Unlock() defer mp.curTsLk.Unlock()
_, err = mp.addTs(m, mp.curTs, false) _, err = mp.addTs(m, mp.curTs, false, false)
return err return err
} }
@ -621,7 +630,7 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
return nil return nil
} }
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) { func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
snonce, err := mp.getStateNonce(m.Message.From, curTs) snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil { if err != nil {
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
@ -643,7 +652,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local
return false, err return false, err
} }
return publish, mp.addLocked(m, !local) return publish, mp.addLocked(m, !local, untrusted)
} }
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
@ -678,17 +687,17 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
return err return err
} }
return mp.addLocked(m, false) return mp.addLocked(m, false, false)
} }
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error { func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
return mp.addLocked(m, false) return mp.addLocked(m, false, false)
} }
func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error { func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) error {
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce) log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
if m.Signature.Type == crypto.SigTypeBLS { if m.Signature.Type == crypto.SigTypeBLS {
mp.blsSigCache.Add(m.Cid(), m.Signature) mp.blsSigCache.Add(m.Cid(), m.Signature)
@ -715,7 +724,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
mp.pending[m.Message.From] = mset mp.pending[m.Message.From] = mset
} }
incr, err := mset.add(m, mp, strict) incr, err := mset.add(m, mp, strict, untrusted)
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)
return err return err
@ -873,7 +882,7 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return nil, err return nil, err
} }
if err := mp.addLocked(msg, false); err != nil { if err := mp.addLocked(msg, false, false); err != nil {
return nil, xerrors.Errorf("add locked failed: %w", err) return nil, xerrors.Errorf("add locked failed: %w", err)
} }
if err := mp.addLocal(msg, msgb); err != nil { if err := mp.addLocal(msg, msgb); err != nil {
@ -887,6 +896,50 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return msg, err return msg, err
} }
// this method is provided for the gateway to push messages.
// differences from Push:
// - strict checks are enabled
// - extra strict add checks are used when adding the messages to the msgSet
// that means: no nonce gaps, at most 10 pending messages for the actor
func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
err := mp.checkMessage(m)
if err != nil {
return cid.Undef, err
}
// serialize push access to reduce lock contention
mp.addSema <- struct{}{}
defer func() {
<-mp.addSema
}()
msgb, err := m.Serialize()
if err != nil {
return cid.Undef, err
}
mp.curTsLk.Lock()
publish, err := mp.addTs(m, mp.curTs, false, true)
if err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err
}
mp.curTsLk.Unlock()
mp.lk.Lock()
if err := mp.addLocal(m, msgb); err != nil {
mp.lk.Unlock()
return cid.Undef, err
}
mp.lk.Unlock()
if publish {
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
}
return m.Cid(), err
}
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) { func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()

View File

@ -86,5 +86,5 @@ func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (ci
// TODO: additional anti-spam checks // TODO: additional anti-spam checks
return a.api.MpoolPush(ctx, sm) return a.api.MpoolPushUntrusted(ctx, sm)
} }

View File

@ -113,6 +113,10 @@ func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (ci
return a.Mpool.Push(smsg) return a.Mpool.Push(smsg)
} }
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return a.Mpool.PushUntrusted(smsg)
}
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
inMsg := *msg inMsg := *msg
{ {