core, core/types: readd transactions after chain re-org
Added a `Difference` method to `types.Transactions` which sets the receiver to the difference of a to b (NOTE: not a **and** b). Transaction pool subscribes to RemovedTransactionEvent adding back to those potential missing from the chain. When a chain re-org occurs remove any transactions that were removed from the canonical chain during the re-org as well as the receipts that were generated in the process. Closes #1746
This commit is contained in:
		
							parent
							
								
									12c0afe4fe
								
							
						
					
					
						commit
						eaa4473dbd
					
				| @ -92,7 +92,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *eth | |||||||
| 
 | 
 | ||||||
| 	db, _ := ethdb.NewMemDatabase() | 	db, _ := ethdb.NewMemDatabase() | ||||||
| 
 | 
 | ||||||
| 	core.WriteGenesisBlockForTesting(db, common.HexToAddress(testAddress), common.String2Big(testBalance)) | 	core.WriteGenesisBlockForTesting(db, core.GenesisAccount{common.HexToAddress(testAddress), common.String2Big(testBalance)}) | ||||||
| 	ks := crypto.NewKeyStorePlain(filepath.Join(tmp, "keystore")) | 	ks := crypto.NewKeyStorePlain(filepath.Join(tmp, "keystore")) | ||||||
| 	am := accounts.NewManager(ks) | 	am := accounts.NewManager(ks) | ||||||
| 	conf := ð.Config{ | 	conf := ð.Config{ | ||||||
|  | |||||||
| @ -134,7 +134,7 @@ func testEth(t *testing.T) (ethereum *eth.Ethereum, err error) { | |||||||
| 
 | 
 | ||||||
| 	db, _ := ethdb.NewMemDatabase() | 	db, _ := ethdb.NewMemDatabase() | ||||||
| 	// set up mock genesis with balance on the testAddress
 | 	// set up mock genesis with balance on the testAddress
 | ||||||
| 	core.WriteGenesisBlockForTesting(db, common.HexToAddress(testAddress), common.String2Big(testBalance)) | 	core.WriteGenesisBlockForTesting(db, core.GenesisAccount{common.HexToAddress(testAddress), common.String2Big(testBalance)}) | ||||||
| 
 | 
 | ||||||
| 	// only use minimalistic stack with no networking
 | 	// only use minimalistic stack with no networking
 | ||||||
| 	ethereum, err = eth.New(ð.Config{ | 	ethereum, err = eth.New(ð.Config{ | ||||||
|  | |||||||
| @ -162,7 +162,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) { | |||||||
| 
 | 
 | ||||||
| 	// Generate a chain of b.N blocks using the supplied block
 | 	// Generate a chain of b.N blocks using the supplied block
 | ||||||
| 	// generator function.
 | 	// generator function.
 | ||||||
| 	genesis := WriteGenesisBlockForTesting(db, benchRootAddr, benchRootFunds) | 	genesis := WriteGenesisBlockForTesting(db, GenesisAccount{benchRootAddr, benchRootFunds}) | ||||||
| 	chain := GenerateChain(genesis, db, b.N, gen) | 	chain := GenerateChain(genesis, db, b.N, gen) | ||||||
| 
 | 
 | ||||||
| 	// Time the insertion of the new chain.
 | 	// Time the insertion of the new chain.
 | ||||||
|  | |||||||
| @ -42,7 +42,7 @@ func ExampleGenerateChain() { | |||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	// Ensure that key1 has some funds in the genesis block.
 | 	// Ensure that key1 has some funds in the genesis block.
 | ||||||
| 	genesis := WriteGenesisBlockForTesting(db, addr1, big.NewInt(1000000)) | 	genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr1, big.NewInt(1000000)}) | ||||||
| 
 | 
 | ||||||
| 	// This call generates a chain of 5 blocks. The function runs for
 | 	// This call generates a chain of 5 blocks. The function runs for
 | ||||||
| 	// each block and adds different features to gen based on the
 | 	// each block and adds different features to gen based on the
 | ||||||
|  | |||||||
| @ -569,18 +569,17 @@ func (self *ChainManager) WriteBlock(block *types.Block) (status writeStatus, er | |||||||
| 		// chain fork
 | 		// chain fork
 | ||||||
| 		if block.ParentHash() != cblock.Hash() { | 		if block.ParentHash() != cblock.Hash() { | ||||||
| 			// during split we merge two different chains and create the new canonical chain
 | 			// during split we merge two different chains and create the new canonical chain
 | ||||||
| 			err := self.merge(cblock, block) | 			err := self.reorg(cblock, block) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return NonStatTy, err | 				return NonStatTy, err | ||||||
| 			} | 			} | ||||||
| 			status = SplitStatTy |  | ||||||
| 		} | 		} | ||||||
|  | 		status = CanonStatTy | ||||||
|  | 
 | ||||||
| 		self.mu.Lock() | 		self.mu.Lock() | ||||||
| 		self.setTotalDifficulty(td) | 		self.setTotalDifficulty(td) | ||||||
| 		self.insert(block) | 		self.insert(block) | ||||||
| 		self.mu.Unlock() | 		self.mu.Unlock() | ||||||
| 
 |  | ||||||
| 		status = CanonStatTy |  | ||||||
| 	} else { | 	} else { | ||||||
| 		status = SideStatTy | 		status = SideStatTy | ||||||
| 	} | 	} | ||||||
| @ -681,9 +680,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { | |||||||
| 
 | 
 | ||||||
| 			return i, err | 			return i, err | ||||||
| 		} | 		} | ||||||
|  | 		if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil { | ||||||
|  | 			glog.V(logger.Warn).Infoln("error writing block receipts:", err) | ||||||
|  | 		} | ||||||
| 
 | 
 | ||||||
| 		txcount += len(block.Transactions()) | 		txcount += len(block.Transactions()) | ||||||
| 
 |  | ||||||
| 		// write the block to the chain and get the status
 | 		// write the block to the chain and get the status
 | ||||||
| 		status, err := self.WriteBlock(block) | 		status, err := self.WriteBlock(block) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| @ -711,10 +712,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { | |||||||
| 			queue[i] = ChainSplitEvent{block, logs} | 			queue[i] = ChainSplitEvent{block, logs} | ||||||
| 			queueEvent.splitCount++ | 			queueEvent.splitCount++ | ||||||
| 		} | 		} | ||||||
| 		if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil { |  | ||||||
| 			glog.V(logger.Warn).Infoln("error writing block receipts:", err) |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		stats.processed++ | 		stats.processed++ | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -729,20 +726,26 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { | |||||||
| 	return 0, nil | 	return 0, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
 | // reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
 | ||||||
| // to be part of the new canonical chain.
 | // to be part of the new canonical chain and accumulates potential missing transactions and post an
 | ||||||
| func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, error) { | // event about them
 | ||||||
|  | func (self *ChainManager) reorg(oldBlock, newBlock *types.Block) error { | ||||||
|  | 	self.mu.Lock() | ||||||
|  | 	defer self.mu.Unlock() | ||||||
|  | 
 | ||||||
| 	var ( | 	var ( | ||||||
| 		newChain    types.Blocks | 		newChain    types.Blocks | ||||||
| 		commonBlock *types.Block | 		commonBlock *types.Block | ||||||
| 		oldStart    = oldBlock | 		oldStart    = oldBlock | ||||||
| 		newStart    = newBlock | 		newStart    = newBlock | ||||||
|  | 		deletedTxs  types.Transactions | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	// first reduce whoever is higher bound
 | 	// first reduce whoever is higher bound
 | ||||||
| 	if oldBlock.NumberU64() > newBlock.NumberU64() { | 	if oldBlock.NumberU64() > newBlock.NumberU64() { | ||||||
| 		// reduce old chain
 | 		// reduce old chain
 | ||||||
| 		for oldBlock = oldBlock; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) { | 		for oldBlock = oldBlock; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) { | ||||||
|  | 			deletedTxs = append(deletedTxs, oldBlock.Transactions()...) | ||||||
| 		} | 		} | ||||||
| 	} else { | 	} else { | ||||||
| 		// reduce new chain and append new chain blocks for inserting later on
 | 		// reduce new chain and append new chain blocks for inserting later on
 | ||||||
| @ -751,10 +754,10 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if oldBlock == nil { | 	if oldBlock == nil { | ||||||
| 		return nil, fmt.Errorf("Invalid old chain") | 		return fmt.Errorf("Invalid old chain") | ||||||
| 	} | 	} | ||||||
| 	if newBlock == nil { | 	if newBlock == nil { | ||||||
| 		return nil, fmt.Errorf("Invalid new chain") | 		return fmt.Errorf("Invalid new chain") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	numSplit := newBlock.Number() | 	numSplit := newBlock.Number() | ||||||
| @ -764,13 +767,14 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e | |||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| 		newChain = append(newChain, newBlock) | 		newChain = append(newChain, newBlock) | ||||||
|  | 		deletedTxs = append(deletedTxs, oldBlock.Transactions()...) | ||||||
| 
 | 
 | ||||||
| 		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash()) | 		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash()) | ||||||
| 		if oldBlock == nil { | 		if oldBlock == nil { | ||||||
| 			return nil, fmt.Errorf("Invalid old chain") | 			return fmt.Errorf("Invalid old chain") | ||||||
| 		} | 		} | ||||||
| 		if newBlock == nil { | 		if newBlock == nil { | ||||||
| 			return nil, fmt.Errorf("Invalid new chain") | 			return fmt.Errorf("Invalid new chain") | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -779,18 +783,8 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e | |||||||
| 		glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4]) | 		glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4]) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return newChain, nil | 	var addedTxs types.Transactions | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // merge merges two different chain to the new canonical chain
 |  | ||||||
| func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error { |  | ||||||
| 	newChain, err := self.diff(oldBlock, newBlock) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("chain reorg failed: %v", err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
 | 	// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
 | ||||||
| 	self.mu.Lock() |  | ||||||
| 	for _, block := range newChain { | 	for _, block := range newChain { | ||||||
| 		// insert the block in the canonical way, re-writing history
 | 		// insert the block in the canonical way, re-writing history
 | ||||||
| 		self.insert(block) | 		self.insert(block) | ||||||
| @ -798,8 +792,18 @@ func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error { | |||||||
| 		PutTransactions(self.chainDb, block, block.Transactions()) | 		PutTransactions(self.chainDb, block, block.Transactions()) | ||||||
| 		PutReceipts(self.chainDb, GetBlockReceipts(self.chainDb, block.Hash())) | 		PutReceipts(self.chainDb, GetBlockReceipts(self.chainDb, block.Hash())) | ||||||
| 
 | 
 | ||||||
|  | 		addedTxs = append(addedTxs, block.Transactions()...) | ||||||
| 	} | 	} | ||||||
| 	self.mu.Unlock() | 
 | ||||||
|  | 	// calculate the difference between deleted and added transactions
 | ||||||
|  | 	diff := types.TxDifference(deletedTxs, addedTxs) | ||||||
|  | 	// When transactions get deleted from the database that means the
 | ||||||
|  | 	// receipts that were created in the fork must also be deleted
 | ||||||
|  | 	for _, tx := range diff { | ||||||
|  | 		DeleteReceipt(self.chainDb, tx.Hash()) | ||||||
|  | 		DeleteTransaction(self.chainDb, tx.Hash()) | ||||||
|  | 	} | ||||||
|  | 	self.eventMux.Post(RemovedTransactionEvent{diff}) | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | |||||||
| @ -30,8 +30,10 @@ import ( | |||||||
| 	"github.com/ethereum/go-ethereum/common" | 	"github.com/ethereum/go-ethereum/common" | ||||||
| 	"github.com/ethereum/go-ethereum/core/state" | 	"github.com/ethereum/go-ethereum/core/state" | ||||||
| 	"github.com/ethereum/go-ethereum/core/types" | 	"github.com/ethereum/go-ethereum/core/types" | ||||||
|  | 	"github.com/ethereum/go-ethereum/crypto" | ||||||
| 	"github.com/ethereum/go-ethereum/ethdb" | 	"github.com/ethereum/go-ethereum/ethdb" | ||||||
| 	"github.com/ethereum/go-ethereum/event" | 	"github.com/ethereum/go-ethereum/event" | ||||||
|  | 	"github.com/ethereum/go-ethereum/params" | ||||||
| 	"github.com/ethereum/go-ethereum/pow" | 	"github.com/ethereum/go-ethereum/pow" | ||||||
| 	"github.com/ethereum/go-ethereum/rlp" | 	"github.com/ethereum/go-ethereum/rlp" | ||||||
| 	"github.com/hashicorp/golang-lru" | 	"github.com/hashicorp/golang-lru" | ||||||
| @ -483,19 +485,115 @@ func TestInsertNonceError(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /* | // Tests that chain reorganizations handle transaction removals and reinsertions.
 | ||||||
| func TestGenesisMismatch(t *testing.T) { | func TestChainTxReorgs(t *testing.T) { | ||||||
| 	db, _ := ethdb.NewMemDatabase() | 	params.MinGasLimit = big.NewInt(125000)      // Minimum the gas limit may ever be.
 | ||||||
| 	var mux event.TypeMux | 	params.GenesisGasLimit = big.NewInt(3141592) // Gas limit of the Genesis block.
 | ||||||
| 	genesis := GenesisBlock(0, db) | 
 | ||||||
| 	_, err := NewChainManager(genesis, db, db, db, thePow(), &mux) | 	var ( | ||||||
| 	if err != nil { | 		key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") | ||||||
| 		t.Error(err) | 		key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") | ||||||
|  | 		key3, _ = crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee") | ||||||
|  | 		addr1   = crypto.PubkeyToAddress(key1.PublicKey) | ||||||
|  | 		addr2   = crypto.PubkeyToAddress(key2.PublicKey) | ||||||
|  | 		addr3   = crypto.PubkeyToAddress(key3.PublicKey) | ||||||
|  | 		db, _   = ethdb.NewMemDatabase() | ||||||
|  | 	) | ||||||
|  | 	genesis := WriteGenesisBlockForTesting(db, | ||||||
|  | 		GenesisAccount{addr1, big.NewInt(1000000)}, | ||||||
|  | 		GenesisAccount{addr2, big.NewInt(1000000)}, | ||||||
|  | 		GenesisAccount{addr3, big.NewInt(1000000)}, | ||||||
|  | 	) | ||||||
|  | 	// Create two transactions shared between the chains:
 | ||||||
|  | 	//  - postponed: transaction included at a later block in the forked chain
 | ||||||
|  | 	//  - swapped: transaction included at the same block number in the forked chain
 | ||||||
|  | 	postponed, _ := types.NewTransaction(0, addr1, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key1) | ||||||
|  | 	swapped, _ := types.NewTransaction(1, addr1, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key1) | ||||||
|  | 
 | ||||||
|  | 	// Create two transactions that will be dropped by the forked chain:
 | ||||||
|  | 	//  - pastDrop: transaction dropped retroactively from a past block
 | ||||||
|  | 	//  - freshDrop: transaction dropped exactly at the block where the reorg is detected
 | ||||||
|  | 	var pastDrop, freshDrop *types.Transaction | ||||||
|  | 
 | ||||||
|  | 	// Create three transactions that will be added in the forked chain:
 | ||||||
|  | 	//  - pastAdd:   transaction added before the reorganiztion is detected
 | ||||||
|  | 	//  - freshAdd:  transaction added at the exact block the reorg is detected
 | ||||||
|  | 	//  - futureAdd: transaction added after the reorg has already finished
 | ||||||
|  | 	var pastAdd, freshAdd, futureAdd *types.Transaction | ||||||
|  | 
 | ||||||
|  | 	chain := GenerateChain(genesis, db, 3, func(i int, gen *BlockGen) { | ||||||
|  | 		switch i { | ||||||
|  | 		case 0: | ||||||
|  | 			pastDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2) | ||||||
|  | 
 | ||||||
|  | 			gen.AddTx(pastDrop)  // This transaction will be dropped in the fork from below the split point
 | ||||||
|  | 			gen.AddTx(postponed) // This transaction will be postponed till block #3 in the fork
 | ||||||
|  | 
 | ||||||
|  | 		case 2: | ||||||
|  | 			freshDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2) | ||||||
|  | 
 | ||||||
|  | 			gen.AddTx(freshDrop) // This transaction will be dropped in the fork from exactly at the split point
 | ||||||
|  | 			gen.AddTx(swapped)   // This transaction will be swapped out at the exact height
 | ||||||
|  | 
 | ||||||
|  | 			gen.OffsetTime(9) // Lower the block difficulty to simulate a weaker chain
 | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | 	// Import the chain. This runs all block validation rules.
 | ||||||
|  | 	evmux := &event.TypeMux{} | ||||||
|  | 	chainman, _ := NewChainManager(db, FakePow{}, evmux) | ||||||
|  | 	chainman.SetProcessor(NewBlockProcessor(db, FakePow{}, chainman, evmux)) | ||||||
|  | 	if i, err := chainman.InsertChain(chain); err != nil { | ||||||
|  | 		t.Fatalf("failed to insert original chain[%d]: %v", i, err) | ||||||
| 	} | 	} | ||||||
| 	genesis = GenesisBlock(1, db) | 
 | ||||||
| 	_, err = NewChainManager(genesis, db, db, db, thePow(), &mux) | 	// overwrite the old chain
 | ||||||
| 	if err == nil { | 	chain = GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) { | ||||||
| 		t.Error("expected genesis mismatch error") | 		switch i { | ||||||
|  | 		case 0: | ||||||
|  | 			pastAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3) | ||||||
|  | 			gen.AddTx(pastAdd) // This transaction needs to be injected during reorg
 | ||||||
|  | 
 | ||||||
|  | 		case 2: | ||||||
|  | 			gen.AddTx(postponed) // This transaction was postponed from block #1 in the original chain
 | ||||||
|  | 			gen.AddTx(swapped)   // This transaction was swapped from the exact current spot in the original chain
 | ||||||
|  | 
 | ||||||
|  | 			freshAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3) | ||||||
|  | 			gen.AddTx(freshAdd) // This transaction will be added exactly at reorg time
 | ||||||
|  | 
 | ||||||
|  | 		case 3: | ||||||
|  | 			futureAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3) | ||||||
|  | 			gen.AddTx(futureAdd) // This transaction will be added after a full reorg
 | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | 	if _, err := chainman.InsertChain(chain); err != nil { | ||||||
|  | 		t.Fatalf("failed to insert forked chain: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// removed tx
 | ||||||
|  | 	for i, tx := range (types.Transactions{pastDrop, freshDrop}) { | ||||||
|  | 		if GetTransaction(db, tx.Hash()) != nil { | ||||||
|  | 			t.Errorf("drop %d: tx found while shouldn't have been", i) | ||||||
|  | 		} | ||||||
|  | 		if GetReceipt(db, tx.Hash()) != nil { | ||||||
|  | 			t.Errorf("drop %d: receipt found while shouldn't have been", i) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	// added tx
 | ||||||
|  | 	for i, tx := range (types.Transactions{pastAdd, freshAdd, futureAdd}) { | ||||||
|  | 		if GetTransaction(db, tx.Hash()) == nil { | ||||||
|  | 			t.Errorf("add %d: expected tx to be found", i) | ||||||
|  | 		} | ||||||
|  | 		if GetReceipt(db, tx.Hash()) == nil { | ||||||
|  | 			t.Errorf("add %d: expected receipt to be found", i) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	// shared tx
 | ||||||
|  | 	for i, tx := range (types.Transactions{postponed, swapped}) { | ||||||
|  | 		if GetTransaction(db, tx.Hash()) == nil { | ||||||
|  | 			t.Errorf("share %d: expected tx to be found", i) | ||||||
|  | 		} | ||||||
|  | 		if GetReceipt(db, tx.Hash()) == nil { | ||||||
|  | 			t.Errorf("share %d: expected receipt to be found", i) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| */ |  | ||||||
|  | |||||||
| @ -36,6 +36,9 @@ type NewBlockEvent struct{ Block *types.Block } | |||||||
| // NewMinedBlockEvent is posted when a block has been imported.
 | // NewMinedBlockEvent is posted when a block has been imported.
 | ||||||
| type NewMinedBlockEvent struct{ Block *types.Block } | type NewMinedBlockEvent struct{ Block *types.Block } | ||||||
| 
 | 
 | ||||||
|  | // RemovedTransactionEvent is posted when a reorg happens
 | ||||||
|  | type RemovedTransactionEvent struct{ Txs types.Transactions } | ||||||
|  | 
 | ||||||
| // ChainSplit is posted when a new head is detected
 | // ChainSplit is posted when a new head is detected
 | ||||||
| type ChainSplitEvent struct { | type ChainSplitEvent struct { | ||||||
| 	Block *types.Block | 	Block *types.Block | ||||||
|  | |||||||
| @ -125,15 +125,27 @@ func GenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big | |||||||
| 	return block | 	return block | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func WriteGenesisBlockForTesting(db ethdb.Database, addr common.Address, balance *big.Int) *types.Block { | type GenesisAccount struct { | ||||||
|  | 	Address common.Address | ||||||
|  | 	Balance *big.Int | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func WriteGenesisBlockForTesting(db ethdb.Database, accounts ...GenesisAccount) *types.Block { | ||||||
|  | 	accountJson := "{" | ||||||
|  | 	for i, account := range accounts { | ||||||
|  | 		if i != 0 { | ||||||
|  | 			accountJson += "," | ||||||
|  | 		} | ||||||
|  | 		accountJson += fmt.Sprintf(`"0x%x":{"balance":"0x%x"}`, account.Address, account.Balance.Bytes()) | ||||||
|  | 	} | ||||||
|  | 	accountJson += "}" | ||||||
|  | 
 | ||||||
| 	testGenesis := fmt.Sprintf(`{ | 	testGenesis := fmt.Sprintf(`{ | ||||||
| 	"nonce":"0x%x", | 	"nonce":"0x%x", | ||||||
| 	"gasLimit":"0x%x", | 	"gasLimit":"0x%x", | ||||||
| 	"difficulty":"0x%x", | 	"difficulty":"0x%x", | ||||||
| 	"alloc": { | 	"alloc": %s | ||||||
| 		"0x%x":{"balance":"0x%x"} | }`, types.EncodeNonce(0), params.GenesisGasLimit.Bytes(), params.GenesisDifficulty.Bytes(), accountJson) | ||||||
| 	} |  | ||||||
| }`, types.EncodeNonce(0), params.GenesisGasLimit.Bytes(), params.GenesisDifficulty.Bytes(), addr, balance.Bytes()) |  | ||||||
| 	block, _ := WriteGenesisBlock(db, strings.NewReader(testGenesis)) | 	block, _ := WriteGenesisBlock(db, strings.NewReader(testGenesis)) | ||||||
| 	return block | 	return block | ||||||
| } | } | ||||||
|  | |||||||
| @ -81,7 +81,7 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( | |||||||
| 		gasLimit:     gasLimitFn, | 		gasLimit:     gasLimitFn, | ||||||
| 		minGasPrice:  new(big.Int), | 		minGasPrice:  new(big.Int), | ||||||
| 		pendingState: state.ManageState(currentStateFn()), | 		pendingState: state.ManageState(currentStateFn()), | ||||||
| 		events:       eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}), | 		events:       eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}), | ||||||
| 	} | 	} | ||||||
| 	go pool.eventLoop() | 	go pool.eventLoop() | ||||||
| 
 | 
 | ||||||
| @ -93,16 +93,18 @@ func (pool *TxPool) eventLoop() { | |||||||
| 	// we need to know the new state. The new state will help us determine
 | 	// we need to know the new state. The new state will help us determine
 | ||||||
| 	// the nonces in the managed state
 | 	// the nonces in the managed state
 | ||||||
| 	for ev := range pool.events.Chan() { | 	for ev := range pool.events.Chan() { | ||||||
| 		pool.mu.Lock() |  | ||||||
| 
 |  | ||||||
| 		switch ev := ev.(type) { | 		switch ev := ev.(type) { | ||||||
| 		case ChainHeadEvent: | 		case ChainHeadEvent: | ||||||
|  | 			pool.mu.Lock() | ||||||
| 			pool.resetState() | 			pool.resetState() | ||||||
|  | 			pool.mu.Unlock() | ||||||
| 		case GasPriceChanged: | 		case GasPriceChanged: | ||||||
|  | 			pool.mu.Lock() | ||||||
| 			pool.minGasPrice = ev.Price | 			pool.minGasPrice = ev.Price | ||||||
|  | 			pool.mu.Unlock() | ||||||
|  | 		case RemovedTransactionEvent: | ||||||
|  | 			pool.AddTransactions(ev.Txs) | ||||||
| 		} | 		} | ||||||
| 
 |  | ||||||
| 		pool.mu.Unlock() |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -238,3 +238,15 @@ func TestNonceRecovery(t *testing.T) { | |||||||
| 		t.Errorf("expected nonce to be %d, got %d", n+1, fn) | 		t.Errorf("expected nonce to be %d, got %d", n+1, fn) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func TestRemovedTxEvent(t *testing.T) { | ||||||
|  | 	pool, key := setupTxPool() | ||||||
|  | 	tx := transaction(0, big.NewInt(1000000), key) | ||||||
|  | 	from, _ := tx.From() | ||||||
|  | 	pool.currentState().AddBalance(from, big.NewInt(1000000000000)) | ||||||
|  | 	pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) | ||||||
|  | 	pool.eventMux.Post(ChainHeadEvent{nil}) | ||||||
|  | 	if len(pool.pending) != 1 { | ||||||
|  | 		t.Error("expected 1 pending tx, got", len(pool.pending)) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | |||||||
| @ -77,6 +77,22 @@ func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactio | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func DeleteTransaction(db ethdb.Database, txHash common.Hash) { | ||||||
|  | 	db.Delete(txHash[:]) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func GetTransaction(db ethdb.Database, txhash common.Hash) *types.Transaction { | ||||||
|  | 	data, _ := db.Get(txhash[:]) | ||||||
|  | 	if len(data) != 0 { | ||||||
|  | 		var tx types.Transaction | ||||||
|  | 		if err := rlp.DecodeBytes(data, &tx); err != nil { | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 		return &tx | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // PutReceipts stores the receipts in the current database
 | // PutReceipts stores the receipts in the current database
 | ||||||
| func PutReceipts(db ethdb.Database, receipts types.Receipts) error { | func PutReceipts(db ethdb.Database, receipts types.Receipts) error { | ||||||
| 	batch := new(leveldb.Batch) | 	batch := new(leveldb.Batch) | ||||||
| @ -107,6 +123,11 @@ func PutReceipts(db ethdb.Database, receipts types.Receipts) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // Delete a receipts from the database
 | ||||||
|  | func DeleteReceipt(db ethdb.Database, txHash common.Hash) { | ||||||
|  | 	db.Delete(append(receiptsPre, txHash[:]...)) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // GetReceipt returns a receipt by hash
 | // GetReceipt returns a receipt by hash
 | ||||||
| func GetReceipt(db ethdb.Database, txHash common.Hash) *types.Receipt { | func GetReceipt(db ethdb.Database, txHash common.Hash) *types.Receipt { | ||||||
| 	data, _ := db.Get(append(receiptsPre, txHash[:]...)) | 	data, _ := db.Get(append(receiptsPre, txHash[:]...)) | ||||||
|  | |||||||
| @ -272,14 +272,36 @@ func (tx *Transaction) String() string { | |||||||
| // Transaction slice type for basic sorting.
 | // Transaction slice type for basic sorting.
 | ||||||
| type Transactions []*Transaction | type Transactions []*Transaction | ||||||
| 
 | 
 | ||||||
| func (s Transactions) Len() int      { return len(s) } | // Len returns the length of s
 | ||||||
|  | func (s Transactions) Len() int { return len(s) } | ||||||
|  | 
 | ||||||
|  | // Swap swaps the i'th and the j'th element in s
 | ||||||
| func (s Transactions) Swap(i, j int) { s[i], s[j] = s[j], s[i] } | func (s Transactions) Swap(i, j int) { s[i], s[j] = s[j], s[i] } | ||||||
| 
 | 
 | ||||||
|  | // GetRlp implements Rlpable and returns the i'th element of s in rlp
 | ||||||
| func (s Transactions) GetRlp(i int) []byte { | func (s Transactions) GetRlp(i int) []byte { | ||||||
| 	enc, _ := rlp.EncodeToBytes(s[i]) | 	enc, _ := rlp.EncodeToBytes(s[i]) | ||||||
| 	return enc | 	return enc | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // Returns a new set t which is the difference between a to b
 | ||||||
|  | func TxDifference(a, b Transactions) (keep Transactions) { | ||||||
|  | 	keep = make(Transactions, 0, len(a)) | ||||||
|  | 
 | ||||||
|  | 	remove := make(map[common.Hash]struct{}) | ||||||
|  | 	for _, tx := range b { | ||||||
|  | 		remove[tx.Hash()] = struct{}{} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, tx := range a { | ||||||
|  | 		if _, ok := remove[tx.Hash()]; !ok { | ||||||
|  | 			keep = append(keep, tx) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return keep | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type TxByNonce struct{ Transactions } | type TxByNonce struct{ Transactions } | ||||||
| 
 | 
 | ||||||
| func (s TxByNonce) Less(i, j int) bool { | func (s TxByNonce) Less(i, j int) bool { | ||||||
|  | |||||||
| @ -33,7 +33,7 @@ func newTestProtocolManager(blocks int, generator func(int, *core.BlockGen), new | |||||||
| 		evmux       = new(event.TypeMux) | 		evmux       = new(event.TypeMux) | ||||||
| 		pow         = new(core.FakePow) | 		pow         = new(core.FakePow) | ||||||
| 		db, _       = ethdb.NewMemDatabase() | 		db, _       = ethdb.NewMemDatabase() | ||||||
| 		genesis     = core.WriteGenesisBlockForTesting(db, testBankAddress, testBankFunds) | 		genesis     = core.WriteGenesisBlockForTesting(db, core.GenesisAccount{testBankAddress, testBankFunds}) | ||||||
| 		chainman, _ = core.NewChainManager(db, pow, evmux) | 		chainman, _ = core.NewChainManager(db, pow, evmux) | ||||||
| 		blockproc   = core.NewBlockProcessor(db, pow, chainman, evmux) | 		blockproc   = core.NewBlockProcessor(db, pow, chainman, evmux) | ||||||
| 	) | 	) | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user