core: place a cap on reorglogs (#25711)
This PR makes the event-sending for deleted and new logs happen in batches, to prevent OOM situation due to large reorgs. Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
		
							parent
							
								
									610cf02c4a
								
							
						
					
					
						commit
						389021a5af
					
				| @ -2000,21 +2000,6 @@ func (bc *BlockChain) collectLogs(hash common.Hash, removed bool) []*types.Log { | ||||
| 	return logs | ||||
| } | ||||
| 
 | ||||
| // mergeLogs returns a merged log slice with specified sort order.
 | ||||
| func mergeLogs(logs [][]*types.Log, reverse bool) []*types.Log { | ||||
| 	var ret []*types.Log | ||||
| 	if reverse { | ||||
| 		for i := len(logs) - 1; i >= 0; i-- { | ||||
| 			ret = append(ret, logs[i]...) | ||||
| 		} | ||||
| 	} else { | ||||
| 		for i := 0; i < len(logs); i++ { | ||||
| 			ret = append(ret, logs[i]...) | ||||
| 		} | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
| 
 | ||||
| // reorg takes two blocks, an old chain and a new chain and will reconstruct the
 | ||||
| // blocks and inserts them to be part of the new canonical chain and accumulates
 | ||||
| // potential missing transactions and post an event about them.
 | ||||
| @ -2028,9 +2013,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { | ||||
| 
 | ||||
| 		deletedTxs []common.Hash | ||||
| 		addedTxs   []common.Hash | ||||
| 
 | ||||
| 		deletedLogs [][]*types.Log | ||||
| 		rebirthLogs [][]*types.Log | ||||
| 	) | ||||
| 	// Reduce the longer chain to the same number as the shorter one
 | ||||
| 	if oldBlock.NumberU64() > newBlock.NumberU64() { | ||||
| @ -2040,12 +2022,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { | ||||
| 			for _, tx := range oldBlock.Transactions() { | ||||
| 				deletedTxs = append(deletedTxs, tx.Hash()) | ||||
| 			} | ||||
| 
 | ||||
| 			// Collect deleted logs for notification
 | ||||
| 			logs := bc.collectLogs(oldBlock.Hash(), true) | ||||
| 			if len(logs) > 0 { | ||||
| 				deletedLogs = append(deletedLogs, logs) | ||||
| 			} | ||||
| 		} | ||||
| 	} else { | ||||
| 		// New chain is longer, stash all blocks away for subsequent insertion
 | ||||
| @ -2072,12 +2048,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { | ||||
| 		for _, tx := range oldBlock.Transactions() { | ||||
| 			deletedTxs = append(deletedTxs, tx.Hash()) | ||||
| 		} | ||||
| 
 | ||||
| 		// Collect deleted logs for notification
 | ||||
| 		logs := bc.collectLogs(oldBlock.Hash(), true) | ||||
| 		if len(logs) > 0 { | ||||
| 			deletedLogs = append(deletedLogs, logs) | ||||
| 		} | ||||
| 		newChain = append(newChain, newBlock) | ||||
| 
 | ||||
| 		// Step back with both chains
 | ||||
| @ -2151,28 +2121,42 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { | ||||
| 		log.Crit("Failed to delete useless indexes", "err", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// Collect the logs
 | ||||
| 	for i := len(newChain) - 1; i >= 1; i-- { | ||||
| 		// Collect reborn logs due to chain reorg
 | ||||
| 		logs := bc.collectLogs(newChain[i].Hash(), false) | ||||
| 		if len(logs) > 0 { | ||||
| 			rebirthLogs = append(rebirthLogs, logs) | ||||
| 	// Send out events for logs from the old canon chain, and 'reborn'
 | ||||
| 	// logs from the new canon chain. The number of logs can be very
 | ||||
| 	// high, so the events are sent in batches of size around 512.
 | ||||
| 
 | ||||
| 	// Deleted logs + blocks:
 | ||||
| 	var deletedLogs []*types.Log | ||||
| 	for i := len(oldChain) - 1; i >= 0; i-- { | ||||
| 		// Also send event for blocks removed from the canon chain.
 | ||||
| 		bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]}) | ||||
| 
 | ||||
| 		// Collect deleted logs for notification
 | ||||
| 		if logs := bc.collectLogs(oldChain[i].Hash(), true); len(logs) > 0 { | ||||
| 			deletedLogs = append(deletedLogs, logs...) | ||||
| 		} | ||||
| 		if len(deletedLogs) > 512 { | ||||
| 			bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) | ||||
| 			deletedLogs = nil | ||||
| 		} | ||||
| 	} | ||||
| 	// If any logs need to be fired, do it now. In theory we could avoid creating
 | ||||
| 	// this goroutine if there are no events to fire, but realistcally that only
 | ||||
| 	// ever happens if we're reorging empty blocks, which will only happen on idle
 | ||||
| 	// networks where performance is not an issue either way.
 | ||||
| 	if len(deletedLogs) > 0 { | ||||
| 		bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)}) | ||||
| 		bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) | ||||
| 	} | ||||
| 
 | ||||
| 	// New logs:
 | ||||
| 	var rebirthLogs []*types.Log | ||||
| 	for i := len(newChain) - 1; i >= 1; i-- { | ||||
| 		if logs := bc.collectLogs(newChain[i].Hash(), false); len(logs) > 0 { | ||||
| 			rebirthLogs = append(rebirthLogs, logs...) | ||||
| 		} | ||||
| 		if len(rebirthLogs) > 512 { | ||||
| 			bc.logsFeed.Send(rebirthLogs) | ||||
| 			rebirthLogs = nil | ||||
| 		} | ||||
| 	} | ||||
| 	if len(rebirthLogs) > 0 { | ||||
| 		bc.logsFeed.Send(mergeLogs(rebirthLogs, false)) | ||||
| 	} | ||||
| 	if len(oldChain) > 0 { | ||||
| 		for i := len(oldChain) - 1; i >= 0; i-- { | ||||
| 			bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]}) | ||||
| 		} | ||||
| 		bc.logsFeed.Send(rebirthLogs) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -1158,37 +1158,53 @@ func TestLogRebirth(t *testing.T) { | ||||
| 	blockchain.SubscribeLogsEvent(newLogCh) | ||||
| 	blockchain.SubscribeRemovedLogsEvent(rmLogsCh) | ||||
| 
 | ||||
| 	// This chain contains a single log.
 | ||||
| 	genDb, chain, _ := GenerateChainWithGenesis(gspec, engine, 2, func(i int, gen *BlockGen) { | ||||
| 		if i == 1 { | ||||
| 			tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, logCode), signer, key1) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("failed to create tx: %v", err) | ||||
| 	// This chain contains 10 logs.
 | ||||
| 	genDb, chain, _ := GenerateChainWithGenesis(gspec, engine, 3, func(i int, gen *BlockGen) { | ||||
| 		if i < 2 { | ||||
| 			for ii := 0; ii < 5; ii++ { | ||||
| 				tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{ | ||||
| 					Nonce:    gen.TxNonce(addr1), | ||||
| 					GasPrice: gen.header.BaseFee, | ||||
| 					Gas:      uint64(1000001), | ||||
| 					Data:     logCode, | ||||
| 				}) | ||||
| 				if err != nil { | ||||
| 					t.Fatalf("failed to create tx: %v", err) | ||||
| 				} | ||||
| 				gen.AddTx(tx) | ||||
| 			} | ||||
| 			gen.AddTx(tx) | ||||
| 		} | ||||
| 	}) | ||||
| 	if _, err := blockchain.InsertChain(chain); err != nil { | ||||
| 		t.Fatalf("failed to insert chain: %v", err) | ||||
| 	} | ||||
| 	checkLogEvents(t, newLogCh, rmLogsCh, 1, 0) | ||||
| 	checkLogEvents(t, newLogCh, rmLogsCh, 10, 0) | ||||
| 
 | ||||
| 	// Generate long reorg chain containing another log. Inserting the
 | ||||
| 	// chain removes one log and adds one.
 | ||||
| 	_, forkChain, _ := GenerateChainWithGenesis(gspec, engine, 2, func(i int, gen *BlockGen) { | ||||
| 		if i == 1 { | ||||
| 			tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, gen.header.BaseFee, logCode), signer, key1) | ||||
| 	// Generate long reorg chain containing more logs. Inserting the
 | ||||
| 	// chain removes one log and adds four.
 | ||||
| 	_, forkChain, _ := GenerateChainWithGenesis(gspec, engine, 3, func(i int, gen *BlockGen) { | ||||
| 		if i == 2 { | ||||
| 			// The last (head) block is not part of the reorg-chain, we can ignore it
 | ||||
| 			return | ||||
| 		} | ||||
| 		for ii := 0; ii < 5; ii++ { | ||||
| 			tx, err := types.SignNewTx(key1, signer, &types.LegacyTx{ | ||||
| 				Nonce:    gen.TxNonce(addr1), | ||||
| 				GasPrice: gen.header.BaseFee, | ||||
| 				Gas:      uint64(1000000), | ||||
| 				Data:     logCode, | ||||
| 			}) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("failed to create tx: %v", err) | ||||
| 			} | ||||
| 			gen.AddTx(tx) | ||||
| 			gen.OffsetTime(-9) // higher block difficulty
 | ||||
| 		} | ||||
| 		gen.OffsetTime(-9) // higher block difficulty
 | ||||
| 	}) | ||||
| 	if _, err := blockchain.InsertChain(forkChain); err != nil { | ||||
| 		t.Fatalf("failed to insert forked chain: %v", err) | ||||
| 	} | ||||
| 	checkLogEvents(t, newLogCh, rmLogsCh, 1, 1) | ||||
| 	checkLogEvents(t, newLogCh, rmLogsCh, 10, 10) | ||||
| 
 | ||||
| 	// This chain segment is rooted in the original chain, but doesn't contain any logs.
 | ||||
| 	// When inserting it, the canonical chain switches away from forkChain and re-emits
 | ||||
| @ -1197,7 +1213,7 @@ func TestLogRebirth(t *testing.T) { | ||||
| 	if _, err := blockchain.InsertChain(newBlocks); err != nil { | ||||
| 		t.Fatalf("failed to insert forked chain: %v", err) | ||||
| 	} | ||||
| 	checkLogEvents(t, newLogCh, rmLogsCh, 1, 1) | ||||
| 	checkLogEvents(t, newLogCh, rmLogsCh, 10, 10) | ||||
| } | ||||
| 
 | ||||
| // This test is a variation of TestLogRebirth. It verifies that log events are emitted
 | ||||
| @ -1252,19 +1268,43 @@ func TestSideLogRebirth(t *testing.T) { | ||||
| 
 | ||||
| func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan RemovedLogsEvent, wantNew, wantRemoved int) { | ||||
| 	t.Helper() | ||||
| 
 | ||||
| 	if len(logsCh) != wantNew { | ||||
| 		t.Fatalf("wrong number of log events: got %d, want %d", len(logsCh), wantNew) | ||||
| 	} | ||||
| 	if len(rmLogsCh) != wantRemoved { | ||||
| 		t.Fatalf("wrong number of removed log events: got %d, want %d", len(rmLogsCh), wantRemoved) | ||||
| 	} | ||||
| 	var ( | ||||
| 		countNew int | ||||
| 		countRm  int | ||||
| 		prev     int | ||||
| 	) | ||||
| 	// Drain events.
 | ||||
| 	for i := 0; i < len(logsCh); i++ { | ||||
| 		<-logsCh | ||||
| 	for len(logsCh) > 0 { | ||||
| 		x := <-logsCh | ||||
| 		countNew += len(x) | ||||
| 		for _, log := range x { | ||||
| 			// We expect added logs to be in ascending order: 0:0, 0:1, 1:0 ...
 | ||||
| 			have := 100*int(log.BlockNumber) + int(log.TxIndex) | ||||
| 			if have < prev { | ||||
| 				t.Fatalf("Expected new logs to arrive in ascending order (%d < %d)", have, prev) | ||||
| 			} | ||||
| 			prev = have | ||||
| 		} | ||||
| 	} | ||||
| 	for i := 0; i < len(rmLogsCh); i++ { | ||||
| 		<-rmLogsCh | ||||
| 	prev = 0 | ||||
| 	for len(rmLogsCh) > 0 { | ||||
| 		x := <-rmLogsCh | ||||
| 		countRm += len(x.Logs) | ||||
| 		for _, log := range x.Logs { | ||||
| 			// We expect removed logs to be in ascending order: 0:0, 0:1, 1:0 ...
 | ||||
| 			have := 100*int(log.BlockNumber) + int(log.TxIndex) | ||||
| 			if have < prev { | ||||
| 				t.Fatalf("Expected removed logs to arrive in ascending order (%d < %d)", have, prev) | ||||
| 			} | ||||
| 			prev = have | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if countNew != wantNew { | ||||
| 		t.Fatalf("wrong number of log events: got %d, want %d", countNew, wantNew) | ||||
| 	} | ||||
| 	if countRm != wantRemoved { | ||||
| 		t.Fatalf("wrong number of removed log events: got %d, want %d", countRm, wantRemoved) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user