Merge pull request #1757 from filecoin-project/feat/faster-mpool-add
cache signatures, and dont check them in a lock
This commit is contained in:
commit
5c73602d24
@ -109,6 +109,10 @@ const BadBlockCacheSize = 1 << 15
|
|||||||
// 10 block reorg.
|
// 10 block reorg.
|
||||||
const BlsSignatureCacheSize = 40000
|
const BlsSignatureCacheSize = 40000
|
||||||
|
|
||||||
|
// Size of signature verification cache
|
||||||
|
// 32k keeps the cache around 10MB in size, max
|
||||||
|
const VerifSigCacheSize = 32000
|
||||||
|
|
||||||
// ///////
|
// ///////
|
||||||
// Limits
|
// Limits
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
@ -87,6 +88,8 @@ type MessagePool struct {
|
|||||||
localMsgs datastore.Datastore
|
localMsgs datastore.Datastore
|
||||||
|
|
||||||
netName dtypes.NetworkName
|
netName dtypes.NetworkName
|
||||||
|
|
||||||
|
sigValCache *lru.TwoQueueCache
|
||||||
}
|
}
|
||||||
|
|
||||||
type msgSet struct {
|
type msgSet struct {
|
||||||
@ -180,6 +183,8 @@ func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
|
|||||||
|
|
||||||
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||||
|
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
||||||
|
|
||||||
mp := &MessagePool{
|
mp := &MessagePool{
|
||||||
closer: make(chan struct{}),
|
closer: make(chan struct{}),
|
||||||
repubTk: time.NewTicker(build.BlockDelay * 10 * time.Second),
|
repubTk: time.NewTicker(build.BlockDelay * 10 * time.Second),
|
||||||
@ -188,6 +193,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
|
|||||||
minGasPrice: types.NewInt(0),
|
minGasPrice: types.NewInt(0),
|
||||||
maxTxPoolSize: 5000,
|
maxTxPoolSize: 5000,
|
||||||
blsSigCache: cache,
|
blsSigCache: cache,
|
||||||
|
sigValCache: verifcache,
|
||||||
changes: lps.New(50),
|
changes: lps.New(50),
|
||||||
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
|
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
|
||||||
api: api,
|
api: api,
|
||||||
@ -313,12 +319,6 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||||
mp.curTsLk.Lock()
|
|
||||||
defer mp.curTsLk.Unlock()
|
|
||||||
return mp.addTs(m, mp.curTs)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
|
|
||||||
// big messages are bad, anti DOS
|
// big messages are bad, anti DOS
|
||||||
if m.Size() > 32*1024 {
|
if m.Size() > 32*1024 {
|
||||||
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
|
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
|
||||||
@ -332,11 +332,53 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error
|
|||||||
return ErrMessageValueTooHigh
|
return ErrMessageValueTooHigh
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sigs.Verify(&m.Signature, m.Message.From, m.Message.Cid().Bytes()); err != nil {
|
if err := mp.VerifyMsgSig(m); err != nil {
|
||||||
log.Warnf("mpooladd signature verification failed: %s", err)
|
log.Warnf("mpooladd signature verification failed: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
defer mp.curTsLk.Unlock()
|
||||||
|
return mp.addTs(m, mp.curTs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sigCacheKey(m *types.SignedMessage) (string, error) {
|
||||||
|
switch m.Signature.Type {
|
||||||
|
case crypto.SigTypeBLS:
|
||||||
|
if len(m.Signature.Data) < 90 {
|
||||||
|
return "", fmt.Errorf("bls signature too short")
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(m.Cid().Bytes()) + string(m.Signature.Data[64:]), nil
|
||||||
|
case crypto.SigTypeSecp256k1:
|
||||||
|
return string(m.Cid().Bytes()), nil
|
||||||
|
default:
|
||||||
|
return "", xerrors.Errorf("unrecognized signature type: %d", m.Signature.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
|
||||||
|
sck, err := sigCacheKey(m)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok := mp.sigValCache.Get(sck)
|
||||||
|
if ok {
|
||||||
|
// already validated, great
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := sigs.Verify(&m.Signature, m.Message.From, m.Message.Cid().Bytes()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.sigValCache.Add(sck, struct{}{})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
|
||||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrBroadcastAnyway)
|
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrBroadcastAnyway)
|
||||||
|
Loading…
Reference in New Issue
Block a user