cache bls signatures for later recovery during forks
This commit is contained in:
parent
aafccaf021
commit
a702a5678c
@ -123,3 +123,7 @@ func init() {
|
|||||||
|
|
||||||
// Sync
|
// Sync
|
||||||
const BadBlockCacheSize = 8192
|
const BadBlockCacheSize = 8192
|
||||||
|
|
||||||
|
// assuming 4000 blocks per round, this lets us not lose any messages across a
|
||||||
|
// 10 block reorg.
|
||||||
|
const BlsSignatureCacheSize = 40000
|
||||||
|
@ -4,10 +4,12 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
lru "github.com/hashicorp/golang-lru"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
@ -38,6 +40,8 @@ type MessagePool struct {
|
|||||||
minGasPrice types.BigInt
|
minGasPrice types.BigInt
|
||||||
|
|
||||||
maxTxPoolSize int
|
maxTxPoolSize int
|
||||||
|
|
||||||
|
blsSigCache *lru.TwoQueueCache
|
||||||
}
|
}
|
||||||
|
|
||||||
type msgSet struct {
|
type msgSet struct {
|
||||||
@ -68,12 +72,14 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
|
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
|
||||||
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||||
mp := &MessagePool{
|
mp := &MessagePool{
|
||||||
pending: make(map[address.Address]*msgSet),
|
pending: make(map[address.Address]*msgSet),
|
||||||
sm: sm,
|
sm: sm,
|
||||||
ps: ps,
|
ps: ps,
|
||||||
minGasPrice: types.NewInt(0),
|
minGasPrice: types.NewInt(0),
|
||||||
maxTxPoolSize: 100000,
|
maxTxPoolSize: 100000,
|
||||||
|
blsSigCache: cache,
|
||||||
}
|
}
|
||||||
sm.ChainStore().SubscribeHeadChanges(mp.HeadChange)
|
sm.ChainStore().SubscribeHeadChanges(mp.HeadChange)
|
||||||
|
|
||||||
@ -138,6 +144,9 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
|||||||
|
|
||||||
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
||||||
log.Debugf("mpooladd: %s %s", m.Message.From, m.Message.Nonce)
|
log.Debugf("mpooladd: %s %s", m.Message.From, m.Message.Nonce)
|
||||||
|
if m.Signature.Type == types.KTBLS {
|
||||||
|
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
||||||
|
}
|
||||||
|
|
||||||
if _, err := mp.sm.ChainStore().PutMessage(m); err != nil {
|
if _, err := mp.sm.ChainStore().PutMessage(m); err != nil {
|
||||||
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
||||||
@ -310,5 +319,18 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
|
|
||||||
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
|
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
|
||||||
// TODO: persist signatures for BLS messages for a little while in case of reorgs
|
// TODO: persist signatures for BLS messages for a little while in case of reorgs
|
||||||
return nil
|
val, ok := mp.blsSigCache.Get(msg.Cid())
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sig, ok := val.(*types.Signature)
|
||||||
|
if !ok {
|
||||||
|
log.Warnf("value in signature cache was not a signature (got %T)", val)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &types.SignedMessage{
|
||||||
|
Message: *msg,
|
||||||
|
Signature: *sig,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user