diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 556c20198..7fc4f2b9d 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -7,7 +7,6 @@ import ( "fmt" "math" stdbig "math/big" - "runtime" "sort" "sync" "time" @@ -53,7 +52,7 @@ var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee)) var MaxActorPendingMessages = 1000 -var MaxNonceGap = uint64(16) +var MaxNonceGap = uint64(4) var ( ErrMessageTooBig = errors.New("message too big") @@ -82,13 +81,6 @@ const ( localUpdates = "update" ) -func init() { - numcpus := uint64(runtime.NumCPU()) - if numcpus < MaxNonceGap { - MaxNonceGap = numcpus - } -} - type MessagePool struct { lk sync.Mutex diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go index acbf23892..1173bdb48 100644 --- a/chain/messagepool/repub.go +++ b/chain/messagepool/repub.go @@ -3,6 +3,7 @@ package messagepool import ( "context" "sort" + "time" "golang.org/x/xerrors" @@ -15,6 +16,8 @@ import ( const repubMsgLimit = 30 +var RepublishBatchDelay = 200 * time.Millisecond + func (mp *MessagePool) republishPendingMessages() error { mp.curTsLk.Lock() ts := mp.curTs @@ -131,6 +134,12 @@ func (mp *MessagePool) republishPendingMessages() error { } count++ + + if count < len(msgs) { + // this delay is here to encourage the pubsub subsystem to process the messages serially + // and avoid creating nonce gaps because of concurrent validation. + time.Sleep(RepublishBatchDelay) + } } // track most recently republished messages diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go index 28a69c92a..703301601 100644 --- a/chain/messagepool/repub_test.go +++ b/chain/messagepool/repub_test.go @@ -12,6 +12,12 @@ import ( ) func TestRepubMessages(t *testing.T) { + oldRepublishBatchDelay = RepublishBatchDelay + RepublishBatchDelay = time.Microsecond + defer func() { + RepublishBatchDelay = oldRepublishBatchDelay + }() + tma := newTestMpoolAPI() ds := datastore.NewMapDatastore()