les, light: implement ODR transaction lookup by hash (#19069)
* les, light: implement ODR transaction lookup by hash * les: delete useless file * internal/ethapi: always use backend to find transaction * les, eth, internal/ethapi: renamed GetCanonicalTransaction to GetTransaction * light: add canonical header verification to GetTransaction
This commit is contained in:
parent
f4fb1a1801
commit
40cdcf8c47
@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common/math"
|
"github.com/ethereum/go-ethereum/common/math"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/bloombits"
|
"github.com/ethereum/go-ethereum/core/bloombits"
|
||||||
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/core/vm"
|
"github.com/ethereum/go-ethereum/core/vm"
|
||||||
@ -178,6 +179,11 @@ func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction
|
|||||||
return b.eth.txPool.Get(hash)
|
return b.eth.txPool.Get(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
|
||||||
|
tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.eth.ChainDb(), txHash)
|
||||||
|
return tx, blockHash, blockNumber, index, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
|
func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
|
||||||
return b.eth.txPool.State().GetNonce(addr), nil
|
return b.eth.txPool.State().GetNonce(addr), nil
|
||||||
}
|
}
|
||||||
|
@ -1155,25 +1155,32 @@ func (s *PublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, addr
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetTransactionByHash returns the transaction for the given hash
|
// GetTransactionByHash returns the transaction for the given hash
|
||||||
func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) *RPCTransaction {
|
func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
|
||||||
// Try to return an already finalized transaction
|
// Try to return an already finalized transaction
|
||||||
if tx, blockHash, blockNumber, index := rawdb.ReadTransaction(s.b.ChainDb(), hash); tx != nil {
|
tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash)
|
||||||
return newRPCTransaction(tx, blockHash, blockNumber, index)
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if tx != nil {
|
||||||
|
return newRPCTransaction(tx, blockHash, blockNumber, index), nil
|
||||||
}
|
}
|
||||||
// No finalized transaction, try to retrieve it from the pool
|
// No finalized transaction, try to retrieve it from the pool
|
||||||
if tx := s.b.GetPoolTransaction(hash); tx != nil {
|
if tx := s.b.GetPoolTransaction(hash); tx != nil {
|
||||||
return newRPCPendingTransaction(tx)
|
return newRPCPendingTransaction(tx), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transaction unknown, return as such
|
// Transaction unknown, return as such
|
||||||
return nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRawTransactionByHash returns the bytes of the transaction for the given hash.
|
// GetRawTransactionByHash returns the bytes of the transaction for the given hash.
|
||||||
func (s *PublicTransactionPoolAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) {
|
func (s *PublicTransactionPoolAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) {
|
||||||
var tx *types.Transaction
|
|
||||||
|
|
||||||
// Retrieve a finalized transaction, or a pooled otherwise
|
// Retrieve a finalized transaction, or a pooled otherwise
|
||||||
if tx, _, _, _ = rawdb.ReadTransaction(s.b.ChainDb(), hash); tx == nil {
|
tx, _, _, _, err := s.b.GetTransaction(ctx, hash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if tx == nil {
|
||||||
if tx = s.b.GetPoolTransaction(hash); tx == nil {
|
if tx = s.b.GetPoolTransaction(hash); tx == nil {
|
||||||
// Transaction not found anywhere, abort
|
// Transaction not found anywhere, abort
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -62,6 +62,7 @@ type Backend interface {
|
|||||||
|
|
||||||
// TxPool API
|
// TxPool API
|
||||||
SendTx(ctx context.Context, signedTx *types.Transaction) error
|
SendTx(ctx context.Context, signedTx *types.Transaction) error
|
||||||
|
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
|
||||||
GetPoolTransactions() (types.Transactions, error)
|
GetPoolTransactions() (types.Transactions, error)
|
||||||
GetPoolTransaction(txHash common.Hash) *types.Transaction
|
GetPoolTransaction(txHash common.Hash) *types.Transaction
|
||||||
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
|
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
|
||||||
|
@ -132,6 +132,10 @@ func (b *LesApiBackend) GetPoolTransaction(txHash common.Hash) *types.Transactio
|
|||||||
return b.eth.txPool.GetTransaction(txHash)
|
return b.eth.txPool.GetTransaction(txHash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *LesApiBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
|
||||||
|
return light.GetTransaction(ctx, b.eth.odr, txHash)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *LesApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
|
func (b *LesApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
|
||||||
return b.eth.txPool.GetNonce(ctx, addr)
|
return b.eth.txPool.GetNonce(ctx, addr)
|
||||||
}
|
}
|
||||||
|
@ -114,9 +114,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
|
|||||||
if leth.config.ULC != nil {
|
if leth.config.ULC != nil {
|
||||||
trustedNodes = leth.config.ULC.TrustedServers
|
trustedNodes = leth.config.ULC.TrustedServers
|
||||||
}
|
}
|
||||||
leth.relay = NewLesTxRelay(peers, leth.reqDist)
|
|
||||||
leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg, trustedNodes)
|
leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg, trustedNodes)
|
||||||
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
|
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
|
||||||
|
leth.relay = NewLesTxRelay(peers, leth.retriever)
|
||||||
|
|
||||||
leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
|
leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
|
||||||
leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations)
|
leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations)
|
||||||
@ -271,6 +271,7 @@ func (s *LightEthereum) Start(srvr *p2p.Server) error {
|
|||||||
// Ethereum protocol.
|
// Ethereum protocol.
|
||||||
func (s *LightEthereum) Stop() error {
|
func (s *LightEthereum) Stop() error {
|
||||||
s.odr.Stop()
|
s.odr.Stop()
|
||||||
|
s.relay.Stop()
|
||||||
s.bloomIndexer.Close()
|
s.bloomIndexer.Close()
|
||||||
s.chtIndexer.Close()
|
s.chtIndexer.Close()
|
||||||
s.blockchain.Stop()
|
s.blockchain.Stop()
|
||||||
|
@ -979,7 +979,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
return errResp(ErrRequestRejected, "")
|
return errResp(ErrRequestRejected, "")
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
stats := make([]txStatus, len(req.Txs))
|
stats := make([]light.TxStatus, len(req.Txs))
|
||||||
for i, tx := range req.Txs {
|
for i, tx := range req.Txs {
|
||||||
if i != 0 && !task.waitOrStop() {
|
if i != 0 && !task.waitOrStop() {
|
||||||
return
|
return
|
||||||
@ -1014,7 +1014,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
return errResp(ErrRequestRejected, "")
|
return errResp(ErrRequestRejected, "")
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
stats := make([]txStatus, len(req.Hashes))
|
stats := make([]light.TxStatus, len(req.Hashes))
|
||||||
for i, hash := range req.Hashes {
|
for i, hash := range req.Hashes {
|
||||||
if i != 0 && !task.waitOrStop() {
|
if i != 0 && !task.waitOrStop() {
|
||||||
return
|
return
|
||||||
@ -1032,7 +1032,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
p.Log().Trace("Received tx status response")
|
p.Log().Trace("Received tx status response")
|
||||||
var resp struct {
|
var resp struct {
|
||||||
ReqID, BV uint64
|
ReqID, BV uint64
|
||||||
Status []txStatus
|
Status []light.TxStatus
|
||||||
}
|
}
|
||||||
if err := msg.Decode(&resp); err != nil {
|
if err := msg.Decode(&resp); err != nil {
|
||||||
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
||||||
@ -1040,6 +1040,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
|
|
||||||
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
|
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
|
||||||
|
|
||||||
|
p.Log().Trace("Received helper trie proof response")
|
||||||
|
deliverMsg = &Msg{
|
||||||
|
MsgType: MsgTxStatus,
|
||||||
|
ReqID: resp.ReqID,
|
||||||
|
Obj: resp.Status,
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
p.Log().Trace("Received unknown message", "code", msg.Code)
|
p.Log().Trace("Received unknown message", "code", msg.Code)
|
||||||
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
|
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
|
||||||
@ -1097,8 +1104,8 @@ func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProtocolManager) txStatus(hash common.Hash) txStatus {
|
func (pm *ProtocolManager) txStatus(hash common.Hash) light.TxStatus {
|
||||||
var stat txStatus
|
var stat light.TxStatus
|
||||||
stat.Status = pm.txpool.Status([]common.Hash{hash})[0]
|
stat.Status = pm.txpool.Status([]common.Hash{hash})[0]
|
||||||
// If the transaction is unknown to the pool, try looking it up locally
|
// If the transaction is unknown to the pool, try looking it up locally
|
||||||
if stat.Status == core.TxStatusUnknown {
|
if stat.Status == core.TxStatusUnknown {
|
||||||
|
@ -443,7 +443,7 @@ func TestTransactionStatusLes2(t *testing.T) {
|
|||||||
|
|
||||||
var reqID uint64
|
var reqID uint64
|
||||||
|
|
||||||
test := func(tx *types.Transaction, send bool, expStatus txStatus) {
|
test := func(tx *types.Transaction, send bool, expStatus light.TxStatus) {
|
||||||
reqID++
|
reqID++
|
||||||
if send {
|
if send {
|
||||||
cost := peer.GetRequestCost(SendTxV2Msg, 1)
|
cost := peer.GetRequestCost(SendTxV2Msg, 1)
|
||||||
@ -452,7 +452,7 @@ func TestTransactionStatusLes2(t *testing.T) {
|
|||||||
cost := peer.GetRequestCost(GetTxStatusMsg, 1)
|
cost := peer.GetRequestCost(GetTxStatusMsg, 1)
|
||||||
sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()})
|
sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()})
|
||||||
}
|
}
|
||||||
if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []txStatus{expStatus}); err != nil {
|
if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil {
|
||||||
t.Errorf("transaction status mismatch")
|
t.Errorf("transaction status mismatch")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -461,20 +461,20 @@ func TestTransactionStatusLes2(t *testing.T) {
|
|||||||
|
|
||||||
// test error status by sending an underpriced transaction
|
// test error status by sending an underpriced transaction
|
||||||
tx0, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
|
tx0, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
|
||||||
test(tx0, true, txStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced.Error()})
|
test(tx0, true, light.TxStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced.Error()})
|
||||||
|
|
||||||
tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
|
tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
|
||||||
test(tx1, false, txStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown
|
test(tx1, false, light.TxStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown
|
||||||
test(tx1, true, txStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending
|
test(tx1, true, light.TxStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending
|
||||||
test(tx1, true, txStatus{Status: core.TxStatusPending}) // adding it again should not return an error
|
test(tx1, true, light.TxStatus{Status: core.TxStatusPending}) // adding it again should not return an error
|
||||||
|
|
||||||
tx2, _ := types.SignTx(types.NewTransaction(1, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
|
tx2, _ := types.SignTx(types.NewTransaction(1, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
|
||||||
tx3, _ := types.SignTx(types.NewTransaction(2, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
|
tx3, _ := types.SignTx(types.NewTransaction(2, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
|
||||||
// send transactions in the wrong order, tx3 should be queued
|
// send transactions in the wrong order, tx3 should be queued
|
||||||
test(tx3, true, txStatus{Status: core.TxStatusQueued})
|
test(tx3, true, light.TxStatus{Status: core.TxStatusQueued})
|
||||||
test(tx2, true, txStatus{Status: core.TxStatusPending})
|
test(tx2, true, light.TxStatus{Status: core.TxStatusPending})
|
||||||
// query again, now tx3 should be pending too
|
// query again, now tx3 should be pending too
|
||||||
test(tx3, false, txStatus{Status: core.TxStatusPending})
|
test(tx3, false, light.TxStatus{Status: core.TxStatusPending})
|
||||||
|
|
||||||
// generate and add a block with tx1 and tx2 included
|
// generate and add a block with tx1 and tx2 included
|
||||||
gchain, _ := core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 1, func(i int, block *core.BlockGen) {
|
gchain, _ := core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 1, func(i int, block *core.BlockGen) {
|
||||||
@ -497,8 +497,8 @@ func TestTransactionStatusLes2(t *testing.T) {
|
|||||||
|
|
||||||
// check if their status is included now
|
// check if their status is included now
|
||||||
block1hash := rawdb.ReadCanonicalHash(db, 1)
|
block1hash := rawdb.ReadCanonicalHash(db, 1)
|
||||||
test(tx1, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}})
|
test(tx1, false, light.TxStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}})
|
||||||
test(tx2, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}})
|
test(tx2, false, light.TxStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}})
|
||||||
|
|
||||||
// create a reorg that rolls them back
|
// create a reorg that rolls them back
|
||||||
gchain, _ = core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 2, func(i int, block *core.BlockGen) {})
|
gchain, _ = core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 2, func(i int, block *core.BlockGen) {})
|
||||||
@ -516,6 +516,6 @@ func TestTransactionStatusLes2(t *testing.T) {
|
|||||||
t.Fatalf("pending count mismatch: have %d, want 3", pending)
|
t.Fatalf("pending count mismatch: have %d, want 3", pending)
|
||||||
}
|
}
|
||||||
// check if their status is pending again
|
// check if their status is pending again
|
||||||
test(tx1, false, txStatus{Status: core.TxStatusPending})
|
test(tx1, false, light.TxStatus{Status: core.TxStatusPending})
|
||||||
test(tx2, false, txStatus{Status: core.TxStatusPending})
|
test(tx2, false, light.TxStatus{Status: core.TxStatusPending})
|
||||||
}
|
}
|
||||||
|
@ -148,6 +148,7 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
|
|||||||
}
|
}
|
||||||
genesis = gspec.MustCommit(db)
|
genesis = gspec.MustCommit(db)
|
||||||
chain BlockChain
|
chain BlockChain
|
||||||
|
pool txPool
|
||||||
)
|
)
|
||||||
if peers == nil {
|
if peers == nil {
|
||||||
peers = newPeerSet()
|
peers = newPeerSet()
|
||||||
@ -162,13 +163,14 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
chain = blockchain
|
chain = blockchain
|
||||||
|
pool = core.NewTxPool(core.DefaultTxPoolConfig, gspec.Config, blockchain)
|
||||||
}
|
}
|
||||||
|
|
||||||
indexConfig := light.TestServerIndexerConfig
|
indexConfig := light.TestServerIndexerConfig
|
||||||
if lightSync {
|
if lightSync {
|
||||||
indexConfig = light.TestClientIndexerConfig
|
indexConfig = light.TestClientIndexerConfig
|
||||||
}
|
}
|
||||||
pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), ulcConfig)
|
pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, pool, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), ulcConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -86,6 +86,7 @@ const (
|
|||||||
MsgReceipts
|
MsgReceipts
|
||||||
MsgProofsV2
|
MsgProofsV2
|
||||||
MsgHelperTrieProofs
|
MsgHelperTrieProofs
|
||||||
|
MsgTxStatus
|
||||||
)
|
)
|
||||||
|
|
||||||
// Msg encodes a LES message that delivers reply data for a request
|
// Msg encodes a LES message that delivers reply data for a request
|
||||||
|
@ -66,6 +66,8 @@ func LesRequest(req light.OdrRequest) LesOdrRequest {
|
|||||||
return (*ChtRequest)(r)
|
return (*ChtRequest)(r)
|
||||||
case *light.BloomRequest:
|
case *light.BloomRequest:
|
||||||
return (*BloomRequest)(r)
|
return (*BloomRequest)(r)
|
||||||
|
case *light.TxStatusRequest:
|
||||||
|
return (*TxStatusRequest)(r)
|
||||||
default:
|
default:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -471,6 +473,44 @@ func (r *BloomRequest) Validate(db ethdb.Database, msg *Msg) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TxStatusRequest is the ODR request type for transaction status
|
||||||
|
type TxStatusRequest light.TxStatusRequest
|
||||||
|
|
||||||
|
// GetCost returns the cost of the given ODR request according to the serving
|
||||||
|
// peer's cost table (implementation of LesOdrRequest)
|
||||||
|
func (r *TxStatusRequest) GetCost(peer *peer) uint64 {
|
||||||
|
return peer.GetRequestCost(GetTxStatusMsg, len(r.Hashes))
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanSend tells if a certain peer is suitable for serving the given request
|
||||||
|
func (r *TxStatusRequest) CanSend(peer *peer) bool {
|
||||||
|
return peer.version >= lpv2
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
||||||
|
func (r *TxStatusRequest) Request(reqID uint64, peer *peer) error {
|
||||||
|
peer.Log().Debug("Requesting transaction status", "count", len(r.Hashes))
|
||||||
|
return peer.RequestTxStatus(reqID, r.GetCost(peer), r.Hashes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Valid processes an ODR request reply message from the LES network
|
||||||
|
// returns true and stores results in memory if the message was a valid reply
|
||||||
|
// to the request (implementation of LesOdrRequest)
|
||||||
|
func (r *TxStatusRequest) Validate(db ethdb.Database, msg *Msg) error {
|
||||||
|
log.Debug("Validating transaction status", "count", len(r.Hashes))
|
||||||
|
|
||||||
|
// Ensure we have a correct message with a single block body
|
||||||
|
if msg.MsgType != MsgTxStatus {
|
||||||
|
return errInvalidMessageType
|
||||||
|
}
|
||||||
|
status := msg.Obj.([]light.TxStatus)
|
||||||
|
if len(status) != len(r.Hashes) {
|
||||||
|
return errInvalidEntryCount
|
||||||
|
}
|
||||||
|
r.Status = status
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// readTraceDB stores the keys of database reads. We use this to check that received node
|
// readTraceDB stores the keys of database reads. We use this to check that received node
|
||||||
// sets contain only the trie nodes necessary to make proofs pass.
|
// sets contain only the trie nodes necessary to make proofs pass.
|
||||||
type readTraceDB struct {
|
type readTraceDB struct {
|
||||||
|
@ -38,7 +38,7 @@ import (
|
|||||||
|
|
||||||
type odrTestFn func(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte
|
type odrTestFn func(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte
|
||||||
|
|
||||||
func TestOdrGetBlockLes2(t *testing.T) { testOdr(t, 2, 1, odrGetBlock) }
|
func TestOdrGetBlockLes2(t *testing.T) { testOdr(t, 2, 1, true, odrGetBlock) }
|
||||||
|
|
||||||
func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
|
func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
|
||||||
var block *types.Block
|
var block *types.Block
|
||||||
@ -54,7 +54,7 @@ func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainCon
|
|||||||
return rlp
|
return rlp
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOdrGetReceiptsLes2(t *testing.T) { testOdr(t, 2, 1, odrGetReceipts) }
|
func TestOdrGetReceiptsLes2(t *testing.T) { testOdr(t, 2, 1, true, odrGetReceipts) }
|
||||||
|
|
||||||
func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
|
func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
|
||||||
var receipts types.Receipts
|
var receipts types.Receipts
|
||||||
@ -74,7 +74,7 @@ func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.Chain
|
|||||||
return rlp
|
return rlp
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOdrAccountsLes2(t *testing.T) { testOdr(t, 2, 1, odrAccounts) }
|
func TestOdrAccountsLes2(t *testing.T) { testOdr(t, 2, 1, true, odrAccounts) }
|
||||||
|
|
||||||
func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
|
func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
|
||||||
dummyAddr := common.HexToAddress("1234567812345678123456781234567812345678")
|
dummyAddr := common.HexToAddress("1234567812345678123456781234567812345678")
|
||||||
@ -102,7 +102,7 @@ func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainCon
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOdrContractCallLes2(t *testing.T) { testOdr(t, 2, 2, odrContractCall) }
|
func TestOdrContractCallLes2(t *testing.T) { testOdr(t, 2, 2, true, odrContractCall) }
|
||||||
|
|
||||||
type callmsg struct {
|
type callmsg struct {
|
||||||
types.Message
|
types.Message
|
||||||
@ -151,8 +151,32 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOdrTxStatusLes2(t *testing.T) { testOdr(t, 2, 1, false, odrTxStatus) }
|
||||||
|
|
||||||
|
func odrTxStatus(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
|
||||||
|
var txs types.Transactions
|
||||||
|
if bc != nil {
|
||||||
|
block := bc.GetBlockByHash(bhash)
|
||||||
|
txs = block.Transactions()
|
||||||
|
} else {
|
||||||
|
if block, _ := lc.GetBlockByHash(ctx, bhash); block != nil {
|
||||||
|
btxs := block.Transactions()
|
||||||
|
txs = make(types.Transactions, len(btxs))
|
||||||
|
for i, tx := range btxs {
|
||||||
|
var err error
|
||||||
|
txs[i], _, _, _, err = light.GetTransaction(ctx, lc.Odr(), tx.Hash())
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rlp, _ := rlp.EncodeToBytes(txs)
|
||||||
|
return rlp
|
||||||
|
}
|
||||||
|
|
||||||
// testOdr tests odr requests whose validation guaranteed by block headers.
|
// testOdr tests odr requests whose validation guaranteed by block headers.
|
||||||
func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
|
func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn odrTestFn) {
|
||||||
// Assemble the test environment
|
// Assemble the test environment
|
||||||
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true)
|
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true)
|
||||||
defer tearDown()
|
defer tearDown()
|
||||||
@ -193,9 +217,10 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
|
|||||||
client.rPeer.hasBlock = func(common.Hash, uint64, bool) bool { return true }
|
client.rPeer.hasBlock = func(common.Hash, uint64, bool) bool { return true }
|
||||||
client.peers.lock.Unlock()
|
client.peers.lock.Unlock()
|
||||||
test(5)
|
test(5)
|
||||||
|
if checkCached {
|
||||||
// still expect all retrievals to pass, now data should be cached locally
|
// still expect all retrievals to pass, now data should be cached locally
|
||||||
client.peers.Unregister(client.rPeer.id)
|
client.peers.Unregister(client.rPeer.id)
|
||||||
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
||||||
test(5)
|
test(5)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
@ -317,7 +317,7 @@ func (p *peer) ReplyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested.
|
// ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested.
|
||||||
func (p *peer) ReplyTxStatus(reqID uint64, stats []txStatus) *reply {
|
func (p *peer) ReplyTxStatus(reqID uint64, stats []light.TxStatus) *reply {
|
||||||
data, _ := rlp.EncodeToBytes(stats)
|
data, _ := rlp.EncodeToBytes(stats)
|
||||||
return &reply{p.rw, TxStatusMsg, reqID, data}
|
return &reply{p.rw, TxStatusMsg, reqID, data}
|
||||||
}
|
}
|
||||||
|
@ -24,8 +24,6 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
@ -227,9 +225,3 @@ type CodeData []struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type proofsData [][]rlp.RawValue
|
type proofsData [][]rlp.RawValue
|
||||||
|
|
||||||
type txStatus struct {
|
|
||||||
Status core.TxStatus
|
|
||||||
Lookup *rawdb.LegacyTxLookupEntry `rlp:"nil"`
|
|
||||||
Error string
|
|
||||||
}
|
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package les
|
package les
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
@ -36,21 +37,27 @@ type LesTxRelay struct {
|
|||||||
peerList []*peer
|
peerList []*peer
|
||||||
peerStartPos int
|
peerStartPos int
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
stop chan struct{}
|
||||||
|
|
||||||
reqDist *requestDistributor
|
retriever *retrieveManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLesTxRelay(ps *peerSet, reqDist *requestDistributor) *LesTxRelay {
|
func NewLesTxRelay(ps *peerSet, retriever *retrieveManager) *LesTxRelay {
|
||||||
r := &LesTxRelay{
|
r := &LesTxRelay{
|
||||||
txSent: make(map[common.Hash]*ltrInfo),
|
txSent: make(map[common.Hash]*ltrInfo),
|
||||||
txPending: make(map[common.Hash]struct{}),
|
txPending: make(map[common.Hash]struct{}),
|
||||||
ps: ps,
|
ps: ps,
|
||||||
reqDist: reqDist,
|
retriever: retriever,
|
||||||
|
stop: make(chan struct{}),
|
||||||
}
|
}
|
||||||
ps.notify(r)
|
ps.notify(r)
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *LesTxRelay) Stop() {
|
||||||
|
close(self.stop)
|
||||||
|
}
|
||||||
|
|
||||||
func (self *LesTxRelay) registerPeer(p *peer) {
|
func (self *LesTxRelay) registerPeer(p *peer) {
|
||||||
self.lock.Lock()
|
self.lock.Lock()
|
||||||
defer self.lock.Unlock()
|
defer self.lock.Unlock()
|
||||||
@ -132,7 +139,7 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
|
|||||||
return func() { peer.SendTxs(reqID, cost, enc) }
|
return func() { peer.SendTxs(reqID, cost, enc) }
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
self.reqDist.queue(rq)
|
go self.retriever.retrieve(context.Background(), reqID, rq, func(p distPeer, msg *Msg) error { return nil }, self.stop)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
17
light/odr.go
17
light/odr.go
@ -175,3 +175,20 @@ func (req *BloomRequest) StoreResult(db ethdb.Database) {
|
|||||||
rawdb.WriteBloomBits(db, req.BitIdx, sectionIdx, sectionHead, req.BloomBits[i])
|
rawdb.WriteBloomBits(db, req.BitIdx, sectionIdx, sectionHead, req.BloomBits[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TxStatus describes the status of a transaction
|
||||||
|
type TxStatus struct {
|
||||||
|
Status core.TxStatus
|
||||||
|
Lookup *rawdb.LegacyTxLookupEntry `rlp:"nil"`
|
||||||
|
Error string
|
||||||
|
}
|
||||||
|
|
||||||
|
// TxStatusRequest is the ODR request type for retrieving transaction status
|
||||||
|
type TxStatusRequest struct {
|
||||||
|
OdrRequest
|
||||||
|
Hashes []common.Hash
|
||||||
|
Status []TxStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreResult stores the retrieved data in local database
|
||||||
|
func (req *TxStatusRequest) StoreResult(db ethdb.Database) {}
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
@ -227,3 +228,23 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetTransaction retrieves a canonical transaction by hash and also returns its position in the chain
|
||||||
|
func GetTransaction(ctx context.Context, odr OdrBackend, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
|
||||||
|
r := &TxStatusRequest{Hashes: []common.Hash{txHash}}
|
||||||
|
if err := odr.Retrieve(ctx, r); err != nil || r.Status[0].Status != core.TxStatusIncluded {
|
||||||
|
return nil, common.Hash{}, 0, 0, err
|
||||||
|
} else {
|
||||||
|
pos := r.Status[0].Lookup
|
||||||
|
// first ensure that we have the header, otherwise block body retrieval will fail
|
||||||
|
// also verify if this is a canonical block by getting the header by number and checking its hash
|
||||||
|
if header, err := GetHeaderByNumber(ctx, odr, pos.BlockIndex); err != nil || header.Hash() != pos.BlockHash {
|
||||||
|
return nil, common.Hash{}, 0, 0, err
|
||||||
|
}
|
||||||
|
if body, err := GetBody(ctx, odr, pos.BlockHash, pos.BlockIndex); err != nil || uint64(len(body.Transactions)) <= pos.Index || body.Transactions[pos.Index].Hash() != txHash {
|
||||||
|
return nil, common.Hash{}, 0, 0, err
|
||||||
|
} else {
|
||||||
|
return body.Transactions[pos.Index], pos.BlockHash, pos.BlockIndex, pos.Index, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user