les, light: add block availability check for ODR requests
This commit is contained in:
parent
c57c54ce96
commit
f12f8a6c14
@ -200,6 +200,12 @@ func (f *lightFetcher) syncLoop() {
|
|||||||
|
|
||||||
// addPeer adds a new peer to the fetcher's peer set
|
// addPeer adds a new peer to the fetcher's peer set
|
||||||
func (f *lightFetcher) addPeer(p *peer) {
|
func (f *lightFetcher) addPeer(p *peer) {
|
||||||
|
p.lock.Lock()
|
||||||
|
p.hasBlock = func(hash common.Hash, number uint64) bool {
|
||||||
|
return f.peerHasBlock(p, hash, number)
|
||||||
|
}
|
||||||
|
p.lock.Unlock()
|
||||||
|
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
@ -208,6 +214,10 @@ func (f *lightFetcher) addPeer(p *peer) {
|
|||||||
|
|
||||||
// removePeer removes a new peer from the fetcher's peer set
|
// removePeer removes a new peer from the fetcher's peer set
|
||||||
func (f *lightFetcher) removePeer(p *peer) {
|
func (f *lightFetcher) removePeer(p *peer) {
|
||||||
|
p.lock.Lock()
|
||||||
|
p.hasBlock = nil
|
||||||
|
p.lock.Unlock()
|
||||||
|
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
@ -315,7 +325,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) {
|
|||||||
// based on its announcements
|
// based on its announcements
|
||||||
func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bool {
|
func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bool {
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Lock()
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
fp := f.peers[p]
|
fp := f.peers[p]
|
||||||
if fp == nil || fp.root == nil {
|
if fp == nil || fp.root == nil {
|
||||||
|
@ -188,6 +188,9 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
|
|||||||
var p *peer
|
var p *peer
|
||||||
if self.serverPool != nil {
|
if self.serverPool != nil {
|
||||||
p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
|
p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
|
||||||
|
if !lreq.CanSend(p) {
|
||||||
|
return false, 0
|
||||||
|
}
|
||||||
return true, p.fcServer.CanSend(lreq.GetCost(p))
|
return true, p.fcServer.CanSend(lreq.GetCost(p))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,7 @@ import (
|
|||||||
|
|
||||||
type LesOdrRequest interface {
|
type LesOdrRequest interface {
|
||||||
GetCost(*peer) uint64
|
GetCost(*peer) uint64
|
||||||
|
CanSend(*peer) bool
|
||||||
Request(uint64, *peer) error
|
Request(uint64, *peer) error
|
||||||
Valid(ethdb.Database, *Msg) bool // if true, keeps the retrieved object
|
Valid(ethdb.Database, *Msg) bool // if true, keeps the retrieved object
|
||||||
}
|
}
|
||||||
@ -66,6 +67,11 @@ func (self *BlockRequest) GetCost(peer *peer) uint64 {
|
|||||||
return peer.GetRequestCost(GetBlockBodiesMsg, 1)
|
return peer.GetRequestCost(GetBlockBodiesMsg, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CanSend tells if a certain peer is suitable for serving the given request
|
||||||
|
func (self *BlockRequest) CanSend(peer *peer) bool {
|
||||||
|
return peer.HasBlock(self.Hash, self.Number)
|
||||||
|
}
|
||||||
|
|
||||||
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
||||||
func (self *BlockRequest) Request(reqID uint64, peer *peer) error {
|
func (self *BlockRequest) Request(reqID uint64, peer *peer) error {
|
||||||
glog.V(logger.Debug).Infof("ODR: requesting body of block %08x from peer %v", self.Hash[:4], peer.id)
|
glog.V(logger.Debug).Infof("ODR: requesting body of block %08x from peer %v", self.Hash[:4], peer.id)
|
||||||
@ -121,6 +127,11 @@ func (self *ReceiptsRequest) GetCost(peer *peer) uint64 {
|
|||||||
return peer.GetRequestCost(GetReceiptsMsg, 1)
|
return peer.GetRequestCost(GetReceiptsMsg, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CanSend tells if a certain peer is suitable for serving the given request
|
||||||
|
func (self *ReceiptsRequest) CanSend(peer *peer) bool {
|
||||||
|
return peer.HasBlock(self.Hash, self.Number)
|
||||||
|
}
|
||||||
|
|
||||||
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
||||||
func (self *ReceiptsRequest) Request(reqID uint64, peer *peer) error {
|
func (self *ReceiptsRequest) Request(reqID uint64, peer *peer) error {
|
||||||
glog.V(logger.Debug).Infof("ODR: requesting receipts for block %08x from peer %v", self.Hash[:4], peer.id)
|
glog.V(logger.Debug).Infof("ODR: requesting receipts for block %08x from peer %v", self.Hash[:4], peer.id)
|
||||||
@ -171,6 +182,11 @@ func (self *TrieRequest) GetCost(peer *peer) uint64 {
|
|||||||
return peer.GetRequestCost(GetProofsMsg, 1)
|
return peer.GetRequestCost(GetProofsMsg, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CanSend tells if a certain peer is suitable for serving the given request
|
||||||
|
func (self *TrieRequest) CanSend(peer *peer) bool {
|
||||||
|
return peer.HasBlock(self.Id.BlockHash, self.Id.BlockNumber)
|
||||||
|
}
|
||||||
|
|
||||||
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
||||||
func (self *TrieRequest) Request(reqID uint64, peer *peer) error {
|
func (self *TrieRequest) Request(reqID uint64, peer *peer) error {
|
||||||
glog.V(logger.Debug).Infof("ODR: requesting trie root %08x key %08x from peer %v", self.Id.Root[:4], self.Key[:4], peer.id)
|
glog.V(logger.Debug).Infof("ODR: requesting trie root %08x key %08x from peer %v", self.Id.Root[:4], self.Key[:4], peer.id)
|
||||||
@ -221,6 +237,11 @@ func (self *CodeRequest) GetCost(peer *peer) uint64 {
|
|||||||
return peer.GetRequestCost(GetCodeMsg, 1)
|
return peer.GetRequestCost(GetCodeMsg, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CanSend tells if a certain peer is suitable for serving the given request
|
||||||
|
func (self *CodeRequest) CanSend(peer *peer) bool {
|
||||||
|
return peer.HasBlock(self.Id.BlockHash, self.Id.BlockNumber)
|
||||||
|
}
|
||||||
|
|
||||||
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
||||||
func (self *CodeRequest) Request(reqID uint64, peer *peer) error {
|
func (self *CodeRequest) Request(reqID uint64, peer *peer) error {
|
||||||
glog.V(logger.Debug).Infof("ODR: requesting node data for hash %08x from peer %v", self.Hash[:4], peer.id)
|
glog.V(logger.Debug).Infof("ODR: requesting node data for hash %08x from peer %v", self.Hash[:4], peer.id)
|
||||||
@ -274,6 +295,14 @@ func (self *ChtRequest) GetCost(peer *peer) uint64 {
|
|||||||
return peer.GetRequestCost(GetHeaderProofsMsg, 1)
|
return peer.GetRequestCost(GetHeaderProofsMsg, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CanSend tells if a certain peer is suitable for serving the given request
|
||||||
|
func (self *ChtRequest) CanSend(peer *peer) bool {
|
||||||
|
peer.lock.RLock()
|
||||||
|
defer peer.lock.RUnlock()
|
||||||
|
|
||||||
|
return self.ChtNum <= (peer.headInfo.Number-light.ChtConfirmations)/light.ChtFrequency
|
||||||
|
}
|
||||||
|
|
||||||
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
||||||
func (self *ChtRequest) Request(reqID uint64, peer *peer) error {
|
func (self *ChtRequest) Request(reqID uint64, peer *peer) error {
|
||||||
glog.V(logger.Debug).Infof("ODR: requesting CHT #%d block #%d from peer %v", self.ChtNum, self.BlockNum, peer.id)
|
glog.V(logger.Debug).Infof("ODR: requesting CHT #%d block #%d from peer %v", self.ChtNum, self.BlockNum, peer.id)
|
||||||
|
12
les/peer.go
12
les/peer.go
@ -57,6 +57,7 @@ type peer struct {
|
|||||||
announceChn chan announceData
|
announceChn chan announceData
|
||||||
|
|
||||||
poolEntry *poolEntry
|
poolEntry *poolEntry
|
||||||
|
hasBlock func(common.Hash, uint64) bool
|
||||||
|
|
||||||
fcClient *flowcontrol.ClientNode // nil if the peer is server only
|
fcClient *flowcontrol.ClientNode // nil if the peer is server only
|
||||||
fcServer *flowcontrol.ServerNode // nil if the peer is client only
|
fcServer *flowcontrol.ServerNode // nil if the peer is client only
|
||||||
@ -135,6 +136,9 @@ func sendResponse(w p2p.MsgWriter, msgcode, reqID, bv uint64, data interface{})
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
|
func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
|
||||||
|
p.lock.RLock()
|
||||||
|
defer p.lock.RUnlock()
|
||||||
|
|
||||||
cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount)
|
cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount)
|
||||||
if cost > p.fcServerParams.BufLimit {
|
if cost > p.fcServerParams.BufLimit {
|
||||||
cost = p.fcServerParams.BufLimit
|
cost = p.fcServerParams.BufLimit
|
||||||
@ -142,6 +146,14 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
|
|||||||
return cost
|
return cost
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasBlock checks if the peer has a given block
|
||||||
|
func (p *peer) HasBlock(hash common.Hash, number uint64) bool {
|
||||||
|
p.lock.RLock()
|
||||||
|
hashBlock := p.hasBlock
|
||||||
|
p.lock.RUnlock()
|
||||||
|
return hashBlock != nil && hashBlock(hash, number)
|
||||||
|
}
|
||||||
|
|
||||||
// SendAnnounce announces the availability of a number of blocks through
|
// SendAnnounce announces the availability of a number of blocks through
|
||||||
// a hash notification.
|
// a hash notification.
|
||||||
func (p *peer) SendAnnounce(request announceData) error {
|
func (p *peer) SendAnnounce(request announceData) error {
|
||||||
|
@ -351,7 +351,6 @@ func (pm *ProtocolManager) blockLoop() {
|
|||||||
var (
|
var (
|
||||||
lastChtKey = []byte("LastChtNumber") // chtNum (uint64 big endian)
|
lastChtKey = []byte("LastChtNumber") // chtNum (uint64 big endian)
|
||||||
chtPrefix = []byte("cht") // chtPrefix + chtNum (uint64 big endian) -> trie root hash
|
chtPrefix = []byte("cht") // chtPrefix + chtNum (uint64 big endian) -> trie root hash
|
||||||
chtConfirmations = light.ChtFrequency / 2
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func getChtRoot(db ethdb.Database, num uint64) common.Hash {
|
func getChtRoot(db ethdb.Database, num uint64) common.Hash {
|
||||||
@ -372,8 +371,8 @@ func makeCht(db ethdb.Database) bool {
|
|||||||
headNum := core.GetBlockNumber(db, headHash)
|
headNum := core.GetBlockNumber(db, headHash)
|
||||||
|
|
||||||
var newChtNum uint64
|
var newChtNum uint64
|
||||||
if headNum > chtConfirmations {
|
if headNum > light.ChtConfirmations {
|
||||||
newChtNum = (headNum - chtConfirmations) / light.ChtFrequency
|
newChtNum = (headNum - light.ChtConfirmations) / light.ChtFrequency
|
||||||
}
|
}
|
||||||
|
|
||||||
var lastChtNum uint64
|
var lastChtNum uint64
|
||||||
|
@ -48,6 +48,7 @@ type OdrRequest interface {
|
|||||||
// TrieID identifies a state or account storage trie
|
// TrieID identifies a state or account storage trie
|
||||||
type TrieID struct {
|
type TrieID struct {
|
||||||
BlockHash, Root common.Hash
|
BlockHash, Root common.Hash
|
||||||
|
BlockNumber uint64
|
||||||
AccKey []byte
|
AccKey []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,6 +57,7 @@ type TrieID struct {
|
|||||||
func StateTrieID(header *types.Header) *TrieID {
|
func StateTrieID(header *types.Header) *TrieID {
|
||||||
return &TrieID{
|
return &TrieID{
|
||||||
BlockHash: header.Hash(),
|
BlockHash: header.Hash(),
|
||||||
|
BlockNumber: header.Number.Uint64(),
|
||||||
AccKey: nil,
|
AccKey: nil,
|
||||||
Root: header.Root,
|
Root: header.Root,
|
||||||
}
|
}
|
||||||
@ -67,6 +69,7 @@ func StateTrieID(header *types.Header) *TrieID {
|
|||||||
func StorageTrieID(state *TrieID, addr common.Address, root common.Hash) *TrieID {
|
func StorageTrieID(state *TrieID, addr common.Address, root common.Hash) *TrieID {
|
||||||
return &TrieID{
|
return &TrieID{
|
||||||
BlockHash: state.BlockHash,
|
BlockHash: state.BlockHash,
|
||||||
|
BlockNumber: state.BlockNumber,
|
||||||
AccKey: crypto.Keccak256(addr[:]),
|
AccKey: crypto.Keccak256(addr[:]),
|
||||||
Root: root,
|
Root: root,
|
||||||
}
|
}
|
||||||
|
@ -39,6 +39,7 @@ var (
|
|||||||
ErrNoHeader = errors.New("Header not found")
|
ErrNoHeader = errors.New("Header not found")
|
||||||
|
|
||||||
ChtFrequency = uint64(4096)
|
ChtFrequency = uint64(4096)
|
||||||
|
ChtConfirmations = uint64(2048)
|
||||||
trustedChtKey = []byte("TrustedCHT")
|
trustedChtKey = []byte("TrustedCHT")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user