forked from cerc-io/plugeth
eth: use new atomic types (#27137)
This commit is contained in:
parent
bbcb5ea37b
commit
f541cad272
@ -23,7 +23,6 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/accounts"
|
"github.com/ethereum/go-ethereum/accounts"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
@ -437,7 +436,7 @@ func (s *Ethereum) StartMining(threads int) error {
|
|||||||
}
|
}
|
||||||
// If mining is started, we can disable the transaction rejection mechanism
|
// If mining is started, we can disable the transaction rejection mechanism
|
||||||
// introduced to speed sync times.
|
// introduced to speed sync times.
|
||||||
atomic.StoreUint32(&s.handler.acceptTxs, 1)
|
s.handler.acceptTxs.Store(true)
|
||||||
|
|
||||||
go s.miner.Start()
|
go s.miner.Start()
|
||||||
}
|
}
|
||||||
@ -469,8 +468,8 @@ func (s *Ethereum) Engine() consensus.Engine { return s.engine }
|
|||||||
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
|
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
|
||||||
func (s *Ethereum) IsListening() bool { return true } // Always listening
|
func (s *Ethereum) IsListening() bool { return true } // Always listening
|
||||||
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
|
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
|
||||||
func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.handler.acceptTxs) == 1 }
|
func (s *Ethereum) Synced() bool { return s.handler.acceptTxs.Load() }
|
||||||
func (s *Ethereum) SetSynced() { atomic.StoreUint32(&s.handler.acceptTxs, 1) }
|
func (s *Ethereum) SetSynced() { s.handler.acceptTxs.Store(true) }
|
||||||
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
|
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
|
||||||
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
|
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
|
||||||
func (s *Ethereum) Merger() *consensus.Merger { return s.merger }
|
func (s *Ethereum) Merger() *consensus.Merger { return s.merger }
|
||||||
|
@ -413,13 +413,13 @@ func testConcurrentAnnouncements(t *testing.T, light bool) {
|
|||||||
secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
|
secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
|
||||||
secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
|
secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
|
||||||
|
|
||||||
counter := uint32(0)
|
var counter atomic.Uint32
|
||||||
firstHeaderWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
|
firstHeaderWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
|
||||||
atomic.AddUint32(&counter, 1)
|
counter.Add(1)
|
||||||
return firstHeaderFetcher(hash, sink)
|
return firstHeaderFetcher(hash, sink)
|
||||||
}
|
}
|
||||||
secondHeaderWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
|
secondHeaderWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
|
||||||
atomic.AddUint32(&counter, 1)
|
counter.Add(1)
|
||||||
return secondHeaderFetcher(hash, sink)
|
return secondHeaderFetcher(hash, sink)
|
||||||
}
|
}
|
||||||
// Iteratively announce blocks until all are imported
|
// Iteratively announce blocks until all are imported
|
||||||
@ -446,8 +446,8 @@ func testConcurrentAnnouncements(t *testing.T, light bool) {
|
|||||||
verifyImportDone(t, imported)
|
verifyImportDone(t, imported)
|
||||||
|
|
||||||
// Make sure no blocks were retrieved twice
|
// Make sure no blocks were retrieved twice
|
||||||
if int(counter) != targetBlocks {
|
if c := int(counter.Load()); c != targetBlocks {
|
||||||
t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
|
t.Fatalf("retrieval count mismatch: have %v, want %v", c, targetBlocks)
|
||||||
}
|
}
|
||||||
verifyChainHeight(t, tester, uint64(len(hashes)-1))
|
verifyChainHeight(t, tester, uint64(len(hashes)-1))
|
||||||
}
|
}
|
||||||
@ -513,9 +513,9 @@ func testPendingDeduplication(t *testing.T, light bool) {
|
|||||||
bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
|
bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
|
||||||
|
|
||||||
delay := 50 * time.Millisecond
|
delay := 50 * time.Millisecond
|
||||||
counter := uint32(0)
|
var counter atomic.Uint32
|
||||||
headerWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
|
headerWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
|
||||||
atomic.AddUint32(&counter, 1)
|
counter.Add(1)
|
||||||
|
|
||||||
// Simulate a long running fetch
|
// Simulate a long running fetch
|
||||||
resink := make(chan *eth.Response)
|
resink := make(chan *eth.Response)
|
||||||
@ -545,8 +545,8 @@ func testPendingDeduplication(t *testing.T, light bool) {
|
|||||||
time.Sleep(delay)
|
time.Sleep(delay)
|
||||||
|
|
||||||
// Check that all blocks were imported and none fetched twice
|
// Check that all blocks were imported and none fetched twice
|
||||||
if int(counter) != 1 {
|
if c := counter.Load(); c != 1 {
|
||||||
t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
|
t.Fatalf("retrieval count mismatch: have %v, want %v", c, 1)
|
||||||
}
|
}
|
||||||
verifyChainHeight(t, tester, 1)
|
verifyChainHeight(t, tester, 1)
|
||||||
}
|
}
|
||||||
@ -632,9 +632,9 @@ func TestImportDeduplication(t *testing.T) {
|
|||||||
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
|
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
|
||||||
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
|
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
|
||||||
|
|
||||||
counter := uint32(0)
|
var counter atomic.Uint32
|
||||||
tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
|
tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
|
||||||
atomic.AddUint32(&counter, uint32(len(blocks)))
|
counter.Add(uint32(len(blocks)))
|
||||||
return tester.insertChain(blocks)
|
return tester.insertChain(blocks)
|
||||||
}
|
}
|
||||||
// Instrument the fetching and imported events
|
// Instrument the fetching and imported events
|
||||||
@ -655,8 +655,8 @@ func TestImportDeduplication(t *testing.T) {
|
|||||||
tester.fetcher.Enqueue("valid", blocks[hashes[1]])
|
tester.fetcher.Enqueue("valid", blocks[hashes[1]])
|
||||||
verifyImportCount(t, imported, 2)
|
verifyImportCount(t, imported, 2)
|
||||||
|
|
||||||
if counter != 2 {
|
if c := counter.Load(); c != 2 {
|
||||||
t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
|
t.Fatalf("import invocation count mismatch: have %v, want %v", c, 2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -853,13 +853,13 @@ func TestHashMemoryExhaustionAttack(t *testing.T) {
|
|||||||
// Create a tester with instrumented import hooks
|
// Create a tester with instrumented import hooks
|
||||||
tester := newTester(false)
|
tester := newTester(false)
|
||||||
|
|
||||||
imported, announces := make(chan interface{}), int32(0)
|
imported, announces := make(chan interface{}), atomic.Int32{}
|
||||||
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
|
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
|
||||||
tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
|
tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
|
||||||
if added {
|
if added {
|
||||||
atomic.AddInt32(&announces, 1)
|
announces.Add(1)
|
||||||
} else {
|
} else {
|
||||||
atomic.AddInt32(&announces, -1)
|
announces.Add(-1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Create a valid chain and an infinite junk chain
|
// Create a valid chain and an infinite junk chain
|
||||||
@ -879,7 +879,7 @@ func TestHashMemoryExhaustionAttack(t *testing.T) {
|
|||||||
}
|
}
|
||||||
tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
|
tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
|
||||||
}
|
}
|
||||||
if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
|
if count := announces.Load(); count != hashLimit+maxQueueDist {
|
||||||
t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
|
t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
|
||||||
}
|
}
|
||||||
// Wait for fetches to complete
|
// Wait for fetches to complete
|
||||||
@ -900,13 +900,13 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
|
|||||||
// Create a tester with instrumented import hooks
|
// Create a tester with instrumented import hooks
|
||||||
tester := newTester(false)
|
tester := newTester(false)
|
||||||
|
|
||||||
imported, enqueued := make(chan interface{}), int32(0)
|
imported, enqueued := make(chan interface{}), atomic.Int32{}
|
||||||
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
|
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
|
||||||
tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
|
tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
|
||||||
if added {
|
if added {
|
||||||
atomic.AddInt32(&enqueued, 1)
|
enqueued.Add(1)
|
||||||
} else {
|
} else {
|
||||||
atomic.AddInt32(&enqueued, -1)
|
enqueued.Add(-1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Create a valid chain and a batch of dangling (but in range) blocks
|
// Create a valid chain and a batch of dangling (but in range) blocks
|
||||||
@ -924,7 +924,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
|
|||||||
tester.fetcher.Enqueue("attacker", block)
|
tester.fetcher.Enqueue("attacker", block)
|
||||||
}
|
}
|
||||||
time.Sleep(200 * time.Millisecond)
|
time.Sleep(200 * time.Millisecond)
|
||||||
if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
|
if queued := enqueued.Load(); queued != blockLimit {
|
||||||
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
|
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
|
||||||
}
|
}
|
||||||
// Queue up a batch of valid blocks, and check that a new peer is allowed to do so
|
// Queue up a batch of valid blocks, and check that a new peer is allowed to do so
|
||||||
@ -932,7 +932,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
|
|||||||
tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
|
tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
|
||||||
}
|
}
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
|
if queued := enqueued.Load(); queued != blockLimit+maxQueueDist-1 {
|
||||||
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
|
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
|
||||||
}
|
}
|
||||||
// Insert the missing piece (and sanity check the import)
|
// Insert the missing piece (and sanity check the import)
|
||||||
|
@ -251,10 +251,10 @@ func (oracle *Oracle) FeeHistory(ctx context.Context, blocks uint64, unresolvedL
|
|||||||
}
|
}
|
||||||
oldestBlock := lastBlock + 1 - blocks
|
oldestBlock := lastBlock + 1 - blocks
|
||||||
|
|
||||||
var (
|
var next atomic.Uint64
|
||||||
next = oldestBlock
|
next.Store(oldestBlock)
|
||||||
results = make(chan *blockFees, blocks)
|
results := make(chan *blockFees, blocks)
|
||||||
)
|
|
||||||
percentileKey := make([]byte, 8*len(rewardPercentiles))
|
percentileKey := make([]byte, 8*len(rewardPercentiles))
|
||||||
for i, p := range rewardPercentiles {
|
for i, p := range rewardPercentiles {
|
||||||
binary.LittleEndian.PutUint64(percentileKey[i*8:(i+1)*8], math.Float64bits(p))
|
binary.LittleEndian.PutUint64(percentileKey[i*8:(i+1)*8], math.Float64bits(p))
|
||||||
@ -263,7 +263,7 @@ func (oracle *Oracle) FeeHistory(ctx context.Context, blocks uint64, unresolvedL
|
|||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
// Retrieve the next block number to fetch with this goroutine
|
// Retrieve the next block number to fetch with this goroutine
|
||||||
blockNumber := atomic.AddUint64(&next, 1) - 1
|
blockNumber := next.Add(1) - 1
|
||||||
if blockNumber > lastBlock {
|
if blockNumber > lastBlock {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -91,8 +91,8 @@ type handler struct {
|
|||||||
networkID uint64
|
networkID uint64
|
||||||
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
|
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
|
||||||
|
|
||||||
snapSync uint32 // Flag whether snap sync is enabled (gets disabled if we already have blocks)
|
snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
|
||||||
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
|
acceptTxs atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
|
||||||
|
|
||||||
database ethdb.Database
|
database ethdb.Database
|
||||||
txpool txPool
|
txpool txPool
|
||||||
@ -149,7 +149,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
|
|||||||
// In these cases however it's safe to reenable snap sync.
|
// In these cases however it's safe to reenable snap sync.
|
||||||
fullBlock, snapBlock := h.chain.CurrentBlock(), h.chain.CurrentSnapBlock()
|
fullBlock, snapBlock := h.chain.CurrentBlock(), h.chain.CurrentSnapBlock()
|
||||||
if fullBlock.Number.Uint64() == 0 && snapBlock.Number.Uint64() > 0 {
|
if fullBlock.Number.Uint64() == 0 && snapBlock.Number.Uint64() > 0 {
|
||||||
h.snapSync = uint32(1)
|
h.snapSync.Store(true)
|
||||||
log.Warn("Switch sync mode from full sync to snap sync")
|
log.Warn("Switch sync mode from full sync to snap sync")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -158,7 +158,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
|
|||||||
log.Warn("Switch sync mode from snap sync to full sync")
|
log.Warn("Switch sync mode from snap sync to full sync")
|
||||||
} else {
|
} else {
|
||||||
// If snap sync was requested and our database is empty, grant it
|
// If snap sync was requested and our database is empty, grant it
|
||||||
h.snapSync = uint32(1)
|
h.snapSync.Store(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If sync succeeds, pass a callback to potentially disable snap sync mode
|
// If sync succeeds, pass a callback to potentially disable snap sync mode
|
||||||
@ -166,13 +166,13 @@ func newHandler(config *handlerConfig) (*handler, error) {
|
|||||||
success := func() {
|
success := func() {
|
||||||
// If we were running snap sync and it finished, disable doing another
|
// If we were running snap sync and it finished, disable doing another
|
||||||
// round on next sync cycle
|
// round on next sync cycle
|
||||||
if atomic.LoadUint32(&h.snapSync) == 1 {
|
if h.snapSync.Load() {
|
||||||
log.Info("Snap sync complete, auto disabling")
|
log.Info("Snap sync complete, auto disabling")
|
||||||
atomic.StoreUint32(&h.snapSync, 0)
|
h.snapSync.Store(false)
|
||||||
}
|
}
|
||||||
// If we've successfully finished a sync cycle, accept transactions from
|
// If we've successfully finished a sync cycle, accept transactions from
|
||||||
// the network
|
// the network
|
||||||
atomic.StoreUint32(&h.acceptTxs, 1)
|
h.acceptTxs.Store(true)
|
||||||
}
|
}
|
||||||
// Construct the downloader (long sync)
|
// Construct the downloader (long sync)
|
||||||
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, success)
|
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, success)
|
||||||
@ -232,7 +232,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
|
|||||||
// accept each others' blocks until a restart. Unfortunately we haven't figured
|
// accept each others' blocks until a restart. Unfortunately we haven't figured
|
||||||
// out a way yet where nodes can decide unilaterally whether the network is new
|
// out a way yet where nodes can decide unilaterally whether the network is new
|
||||||
// or not. This should be fixed if we figure out a solution.
|
// or not. This should be fixed if we figure out a solution.
|
||||||
if atomic.LoadUint32(&h.snapSync) == 1 {
|
if h.snapSync.Load() {
|
||||||
log.Warn("Snap syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
|
log.Warn("Snap syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
@ -261,7 +261,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
|
|||||||
}
|
}
|
||||||
n, err := h.chain.InsertChain(blocks)
|
n, err := h.chain.InsertChain(blocks)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import
|
h.acceptTxs.Store(true) // Mark initial sync done on any fetcher import
|
||||||
}
|
}
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
@ -310,7 +310,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
reject := false // reserved peer slots
|
reject := false // reserved peer slots
|
||||||
if atomic.LoadUint32(&h.snapSync) == 1 {
|
if h.snapSync.Load() {
|
||||||
if snap == nil {
|
if snap == nil {
|
||||||
// If we are running snap-sync, we want to reserve roughly half the peer
|
// If we are running snap-sync, we want to reserve roughly half the peer
|
||||||
// slots for peers supporting the snap protocol.
|
// slots for peers supporting the snap protocol.
|
||||||
|
@ -19,7 +19,6 @@ package eth
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
@ -52,7 +51,7 @@ func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
|
|||||||
// AcceptTxs retrieves whether transaction processing is enabled on the node
|
// AcceptTxs retrieves whether transaction processing is enabled on the node
|
||||||
// or if inbound transactions should simply be dropped.
|
// or if inbound transactions should simply be dropped.
|
||||||
func (h *ethHandler) AcceptTxs() bool {
|
func (h *ethHandler) AcceptTxs() bool {
|
||||||
return atomic.LoadUint32(&h.acceptTxs) == 1
|
return h.acceptTxs.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle is invoked from a peer's message handler when it receives a new remote
|
// Handle is invoked from a peer's message handler when it receives a new remote
|
||||||
|
@ -248,7 +248,7 @@ func testRecvTransactions(t *testing.T, protocol uint) {
|
|||||||
handler := newTestHandler()
|
handler := newTestHandler()
|
||||||
defer handler.close()
|
defer handler.close()
|
||||||
|
|
||||||
handler.handler.acceptTxs = 1 // mark synced to accept transactions
|
handler.handler.acceptTxs.Store(true) // mark synced to accept transactions
|
||||||
|
|
||||||
txs := make(chan core.NewTxsEvent)
|
txs := make(chan core.NewTxsEvent)
|
||||||
sub := handler.txpool.SubscribeNewTxsEvent(txs)
|
sub := handler.txpool.SubscribeNewTxsEvent(txs)
|
||||||
@ -394,7 +394,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
|
|||||||
// to receive them. We need multiple sinks since a one-to-one peering would
|
// to receive them. We need multiple sinks since a one-to-one peering would
|
||||||
// broadcast all transactions without announcement.
|
// broadcast all transactions without announcement.
|
||||||
source := newTestHandler()
|
source := newTestHandler()
|
||||||
source.handler.snapSync = 0 // Avoid requiring snap, otherwise some will be dropped below
|
source.handler.snapSync.Store(false) // Avoid requiring snap, otherwise some will be dropped below
|
||||||
defer source.close()
|
defer source.close()
|
||||||
|
|
||||||
sinks := make([]*testHandler, 10)
|
sinks := make([]*testHandler, 10)
|
||||||
@ -402,7 +402,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
|
|||||||
sinks[i] = newTestHandler()
|
sinks[i] = newTestHandler()
|
||||||
defer sinks[i].close()
|
defer sinks[i].close()
|
||||||
|
|
||||||
sinks[i].handler.acceptTxs = 1 // mark synced to accept transactions
|
sinks[i].handler.acceptTxs.Store(true) // mark synced to accept transactions
|
||||||
}
|
}
|
||||||
// Interconnect all the sink handlers with the source handler
|
// Interconnect all the sink handlers with the source handler
|
||||||
for i, sink := range sinks {
|
for i, sink := range sinks {
|
||||||
|
@ -449,10 +449,10 @@ type Syncer struct {
|
|||||||
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
|
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
|
||||||
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running
|
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running
|
||||||
|
|
||||||
trienodeHealRate float64 // Average heal rate for processing trie node data
|
trienodeHealRate float64 // Average heal rate for processing trie node data
|
||||||
trienodeHealPend uint64 // Number of trie nodes currently pending for processing
|
trienodeHealPend atomic.Uint64 // Number of trie nodes currently pending for processing
|
||||||
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
|
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
|
||||||
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated
|
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated
|
||||||
|
|
||||||
trienodeHealSynced uint64 // Number of state trie nodes downloaded
|
trienodeHealSynced uint64 // Number of state trie nodes downloaded
|
||||||
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
|
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
|
||||||
@ -2189,7 +2189,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
|
|||||||
// HR(N) = (1-MI)^N*(OR-NR) + NR
|
// HR(N) = (1-MI)^N*(OR-NR) + NR
|
||||||
s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate
|
s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate
|
||||||
|
|
||||||
pending := atomic.LoadUint64(&s.trienodeHealPend)
|
pending := s.trienodeHealPend.Load()
|
||||||
if time.Since(s.trienodeHealThrottled) > time.Second {
|
if time.Since(s.trienodeHealThrottled) > time.Second {
|
||||||
// Periodically adjust the trie node throttler
|
// Periodically adjust the trie node throttler
|
||||||
if float64(pending) > 2*s.trienodeHealRate {
|
if float64(pending) > 2*s.trienodeHealRate {
|
||||||
@ -2776,9 +2776,9 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
|
|||||||
return errors.New("unexpected healing trienode")
|
return errors.New("unexpected healing trienode")
|
||||||
}
|
}
|
||||||
// Response validated, send it to the scheduler for filling
|
// Response validated, send it to the scheduler for filling
|
||||||
atomic.AddUint64(&s.trienodeHealPend, fills)
|
s.trienodeHealPend.Add(fills)
|
||||||
defer func() {
|
defer func() {
|
||||||
atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1))
|
s.trienodeHealPend.Add(^(fills - 1))
|
||||||
}()
|
}()
|
||||||
response := &trienodeHealResponse{
|
response := &trienodeHealResponse{
|
||||||
paths: req.paths,
|
paths: req.paths,
|
||||||
|
@ -19,7 +19,6 @@ package eth
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"math/big"
|
"math/big"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
@ -205,7 +204,7 @@ func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp {
|
|||||||
|
|
||||||
func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
|
func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
|
||||||
// If we're in snap sync mode, return that directly
|
// If we're in snap sync mode, return that directly
|
||||||
if atomic.LoadUint32(&cs.handler.snapSync) == 1 {
|
if cs.handler.snapSync.Load() {
|
||||||
block := cs.handler.chain.CurrentSnapBlock()
|
block := cs.handler.chain.CurrentSnapBlock()
|
||||||
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
|
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
|
||||||
return downloader.SnapSync, td
|
return downloader.SnapSync, td
|
||||||
@ -256,13 +255,13 @@ func (h *handler) doSync(op *chainSyncOp) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if atomic.LoadUint32(&h.snapSync) == 1 {
|
if h.snapSync.Load() {
|
||||||
log.Info("Snap sync complete, auto disabling")
|
log.Info("Snap sync complete, auto disabling")
|
||||||
atomic.StoreUint32(&h.snapSync, 0)
|
h.snapSync.Store(false)
|
||||||
}
|
}
|
||||||
// If we've successfully finished a sync cycle, enable accepting transactions
|
// If we've successfully finished a sync cycle, enable accepting transactions
|
||||||
// from the network.
|
// from the network.
|
||||||
atomic.StoreUint32(&h.acceptTxs, 1)
|
h.acceptTxs.Store(true)
|
||||||
|
|
||||||
head := h.chain.CurrentBlock()
|
head := h.chain.CurrentBlock()
|
||||||
if head.Number.Uint64() > 0 {
|
if head.Number.Uint64() > 0 {
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
package eth
|
package eth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -39,14 +38,14 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) {
|
|||||||
|
|
||||||
// Create an empty handler and ensure it's in snap sync mode
|
// Create an empty handler and ensure it's in snap sync mode
|
||||||
empty := newTestHandler()
|
empty := newTestHandler()
|
||||||
if atomic.LoadUint32(&empty.handler.snapSync) == 0 {
|
if !empty.handler.snapSync.Load() {
|
||||||
t.Fatalf("snap sync disabled on pristine blockchain")
|
t.Fatalf("snap sync disabled on pristine blockchain")
|
||||||
}
|
}
|
||||||
defer empty.close()
|
defer empty.close()
|
||||||
|
|
||||||
// Create a full handler and ensure snap sync ends up disabled
|
// Create a full handler and ensure snap sync ends up disabled
|
||||||
full := newTestHandlerWithBlocks(1024)
|
full := newTestHandlerWithBlocks(1024)
|
||||||
if atomic.LoadUint32(&full.handler.snapSync) == 1 {
|
if full.handler.snapSync.Load() {
|
||||||
t.Fatalf("snap sync not disabled on non-empty blockchain")
|
t.Fatalf("snap sync not disabled on non-empty blockchain")
|
||||||
}
|
}
|
||||||
defer full.close()
|
defer full.close()
|
||||||
@ -91,7 +90,7 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) {
|
|||||||
if err := empty.handler.doSync(op); err != nil {
|
if err := empty.handler.doSync(op); err != nil {
|
||||||
t.Fatal("sync failed:", err)
|
t.Fatal("sync failed:", err)
|
||||||
}
|
}
|
||||||
if atomic.LoadUint32(&empty.handler.snapSync) == 1 {
|
if empty.handler.snapSync.Load() {
|
||||||
t.Fatalf("snap sync not disabled after successful synchronisation")
|
t.Fatalf("snap sync not disabled after successful synchronisation")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user