diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index 0d2f6f950..d8f932041 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -83,7 +83,7 @@ type Matcher struct { retrievals chan chan *Retrieval // Retriever processes waiting for task allocations deliveries chan *Retrieval // Retriever processes waiting for task response deliveries - running uint32 // Atomic flag whether a session is live or not + running atomic.Bool // Atomic flag whether a session is live or not } // NewMatcher creates a new pipeline for retrieving bloom bit streams and doing @@ -146,10 +146,10 @@ func (m *Matcher) addScheduler(idx uint) { // channel is closed. func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) { // Make sure we're not creating concurrent sessions - if atomic.SwapUint32(&m.running, 1) == 1 { + if m.running.Swap(true) { return nil, errors.New("matcher already running") } - defer atomic.StoreUint32(&m.running, 0) + defer m.running.Store(false) // Initiate a new matching round session := &MatcherSession{ diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go index 93d4632b8..36764c3f1 100644 --- a/core/bloombits/matcher_test.go +++ b/core/bloombits/matcher_test.go @@ -160,7 +160,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, in } } // Track the number of retrieval requests made - var requested uint32 + var requested atomic.Uint32 // Start the matching session for the filter and the retriever goroutines quit := make(chan struct{}) @@ -208,15 +208,15 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, in session.Close() close(quit) - if retrievals != 0 && requested != retrievals { - t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals) + if retrievals != 0 && requested.Load() != retrievals { + t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested.Load(), retrievals) } - return requested + return requested.Load() } // startRetrievers starts a batch of goroutines listening for section requests // and serving them. -func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) { +func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *atomic.Uint32, batch int) { requests := make(chan chan *Retrieval) for i := 0; i < 10; i++ { @@ -238,7 +238,7 @@ func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *ui for i, section := range task.Sections { if rand.Int()%4 != 0 { // Handle occasional missing deliveries task.Bitsets[i] = generateBitset(task.Bit, section) - atomic.AddUint32(retrievals, 1) + retrievals.Add(1) } } request <- task diff --git a/core/bloombits/scheduler_test.go b/core/bloombits/scheduler_test.go index 49e113c11..dcaaa9152 100644 --- a/core/bloombits/scheduler_test.go +++ b/core/bloombits/scheduler_test.go @@ -45,13 +45,13 @@ func testScheduler(t *testing.T, clients int, fetchers int, requests int) { fetch := make(chan *request, 16) defer close(fetch) - var delivered uint32 + var delivered atomic.Uint32 for i := 0; i < fetchers; i++ { go func() { defer fetchPend.Done() for req := range fetch { - atomic.AddUint32(&delivered, 1) + delivered.Add(1) f.deliver([]uint64{ req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds) @@ -97,7 +97,7 @@ func testScheduler(t *testing.T, clients int, fetchers int, requests int) { } pend.Wait() - if have := atomic.LoadUint32(&delivered); int(have) != requests { + if have := delivered.Load(); int(have) != requests { t.Errorf("request count mismatch: have %v, want %v", have, requests) } }