cap MaxNonceGap to 4, add delay between batch messages during republish
This commit is contained in:
parent
f53d2e3a46
commit
28f57667f0
@ -7,7 +7,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
stdbig "math/big"
|
stdbig "math/big"
|
||||||
"runtime"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -53,7 +52,7 @@ var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee))
|
|||||||
|
|
||||||
var MaxActorPendingMessages = 1000
|
var MaxActorPendingMessages = 1000
|
||||||
|
|
||||||
var MaxNonceGap = uint64(16)
|
var MaxNonceGap = uint64(4)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrMessageTooBig = errors.New("message too big")
|
ErrMessageTooBig = errors.New("message too big")
|
||||||
@ -82,13 +81,6 @@ const (
|
|||||||
localUpdates = "update"
|
localUpdates = "update"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
numcpus := uint64(runtime.NumCPU())
|
|
||||||
if numcpus < MaxNonceGap {
|
|
||||||
MaxNonceGap = numcpus
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type MessagePool struct {
|
type MessagePool struct {
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ package messagepool
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sort"
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -15,6 +16,8 @@ import (
|
|||||||
|
|
||||||
const repubMsgLimit = 30
|
const repubMsgLimit = 30
|
||||||
|
|
||||||
|
var RepublishBatchDelay = 200 * time.Millisecond
|
||||||
|
|
||||||
func (mp *MessagePool) republishPendingMessages() error {
|
func (mp *MessagePool) republishPendingMessages() error {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
ts := mp.curTs
|
ts := mp.curTs
|
||||||
@ -131,6 +134,12 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
count++
|
count++
|
||||||
|
|
||||||
|
if count < len(msgs) {
|
||||||
|
// this delay is here to encourage the pubsub subsystem to process the messages serially
|
||||||
|
// and avoid creating nonce gaps because of concurrent validation.
|
||||||
|
time.Sleep(RepublishBatchDelay)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// track most recently republished messages
|
// track most recently republished messages
|
||||||
|
@ -12,6 +12,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestRepubMessages(t *testing.T) {
|
func TestRepubMessages(t *testing.T) {
|
||||||
|
oldRepublishBatchDelay = RepublishBatchDelay
|
||||||
|
RepublishBatchDelay = time.Microsecond
|
||||||
|
defer func() {
|
||||||
|
RepublishBatchDelay = oldRepublishBatchDelay
|
||||||
|
}()
|
||||||
|
|
||||||
tma := newTestMpoolAPI()
|
tma := newTestMpoolAPI()
|
||||||
ds := datastore.NewMapDatastore()
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user