add semaphore in push and friends to reduce lock contention
This commit is contained in:
parent
472e502218
commit
7be18df6ea
@ -68,6 +68,8 @@ type MessagePool struct {
|
|||||||
|
|
||||||
ds dtypes.MetadataDS
|
ds dtypes.MetadataDS
|
||||||
|
|
||||||
|
addSema chan struct{}
|
||||||
|
|
||||||
closer chan struct{}
|
closer chan struct{}
|
||||||
repubTk *clock.Ticker
|
repubTk *clock.Ticker
|
||||||
|
|
||||||
@ -158,6 +160,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
|
|||||||
|
|
||||||
mp := &MessagePool{
|
mp := &MessagePool{
|
||||||
ds: ds,
|
ds: ds,
|
||||||
|
addSema: make(chan struct{}, 1),
|
||||||
closer: make(chan struct{}),
|
closer: make(chan struct{}),
|
||||||
repubTk: build.Clock.Ticker(RepublishInterval),
|
repubTk: build.Clock.Ticker(RepublishInterval),
|
||||||
localAddrs: make(map[address.Address]struct{}),
|
localAddrs: make(map[address.Address]struct{}),
|
||||||
@ -251,6 +254,12 @@ func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.Cha
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||||
|
// serialize push access to reduce lock contention
|
||||||
|
mp.addSema <- struct{}{}
|
||||||
|
defer func() {
|
||||||
|
<-mp.addSema
|
||||||
|
}()
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
epoch := mp.curTs.Height()
|
epoch := mp.curTs.Height()
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
@ -296,6 +305,12 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serialize push access to reduce lock contention
|
||||||
|
mp.addSema <- struct{}{}
|
||||||
|
defer func() {
|
||||||
|
<-mp.addSema
|
||||||
|
}()
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
return mp.addTs(m, mp.curTs)
|
return mp.addTs(m, mp.curTs)
|
||||||
@ -494,6 +509,12 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, cb func(address.Address, uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
|
func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, cb func(address.Address, uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
|
||||||
|
// serialize push access to reduce lock contention
|
||||||
|
mp.addSema <- struct{}{}
|
||||||
|
defer func() {
|
||||||
|
<-mp.addSema
|
||||||
|
}()
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user