diff --git a/core/chain_indexer.go b/core/chain_indexer.go index 11a7c96fa..b80b517d9 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -322,7 +322,6 @@ func (c *ChainIndexer) updateLoop() { updating = false c.log.Info("Finished upgrading chain index") } - c.cascadedHead = c.storedSections*c.sectionSize - 1 for _, child := range c.children { c.log.Trace("Cascading chain index update", "head", c.cascadedHead) diff --git a/eth/backend.go b/eth/backend.go index c530d3c7c..da7e0b2cd 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -136,7 +136,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { gasPrice: config.MinerGasPrice, etherbase: config.Etherbase, bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms), + bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), } log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId) @@ -426,7 +426,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol { // Ethereum protocol implementation. func (s *Ethereum) Start(srvr *p2p.Server) error { // Start the bloom bits servicing goroutines - s.startBloomHandlers() + s.startBloomHandlers(params.BloomBitsBlocks) // Start the RPC service s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion()) diff --git a/eth/bloombits.go b/eth/bloombits.go index eb18565e2..c7bb56140 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/params" ) const ( @@ -50,7 +49,7 @@ const ( // startBloomHandlers starts a batch of goroutines to accept bloom bit database // retrievals from possibly a range of filters and serving the data to satisfy. -func (eth *Ethereum) startBloomHandlers() { +func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { for i := 0; i < bloomServiceThreads; i++ { go func() { for { @@ -62,9 +61,9 @@ func (eth *Ethereum) startBloomHandlers() { task := <-request task.Bitsets = make([][]byte, len(task.Sections)) for i, section := range task.Sections { - head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1) + head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1) if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil { - if blob, err := bitutil.DecompressBytes(compVector, int(params.BloomBitsBlocks)/8); err == nil { + if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { task.Bitsets[i] = blob } else { task.Error = err @@ -81,10 +80,6 @@ func (eth *Ethereum) startBloomHandlers() { } const ( - // bloomConfirms is the number of confirmation blocks before a bloom section is - // considered probably final and its rotated bits are calculated. - bloomConfirms = 256 - // bloomThrottling is the time to wait between processing two consecutive index // sections. It's useful during chain upgrades to prevent disk overload. bloomThrottling = 100 * time.Millisecond @@ -102,14 +97,14 @@ type BloomIndexer struct { // NewBloomIndexer returns a chain indexer that generates bloom bits data for the // canonical chain for fast logs filtering. -func NewBloomIndexer(db ethdb.Database, size, confReq uint64) *core.ChainIndexer { +func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *core.ChainIndexer { backend := &BloomIndexer{ db: db, size: size, } table := ethdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) - return core.NewChainIndexer(db, table, backend, size, confReq, bloomThrottling, "bloombits") + return core.NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits") } // Reset implements core.ChainIndexerBackend, starting a new bloombits index diff --git a/les/api_backend.go b/les/api_backend.go index 4232d3ae0..aa748a4ea 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -192,7 +192,7 @@ func (b *LesApiBackend) BloomStatus() (uint64, uint64) { return 0, 0 } sections, _, _ := b.eth.bloomIndexer.Sections() - return light.BloomTrieFrequency, sections + return params.BloomBitsBlocksClient, sections } func (b *LesApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { diff --git a/les/backend.go b/les/backend.go index 00025ba63..75049da08 100644 --- a/les/backend.go +++ b/les/backend.go @@ -95,6 +95,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { lesCommons: lesCommons{ chainDb: chainDb, config: config, + iConfig: light.DefaultClientIndexerConfig, }, chainConfig: chainConfig, eventMux: ctx.EventMux, @@ -105,16 +106,16 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { shutdownChan: make(chan bool), networkId: config.NetworkId, bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency, light.HelperTrieConfirmations), + bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), } leth.relay = NewLesTxRelay(peers, leth.reqDist) leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg) leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool) - leth.odr = NewLesOdr(chainDb, leth.retriever) - leth.chtIndexer = light.NewChtIndexer(chainDb, true, leth.odr) - leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, true, leth.odr) + leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever) + leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequencyClient, params.HelperTrieConfirmations) + leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency) leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer) // Note: NewLightChain adds the trusted checkpoint so it needs an ODR with @@ -135,7 +136,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { } leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay) - if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, true, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil { + if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil { return nil, err } leth.ApiBackend = &LesApiBackend{leth, nil} @@ -230,8 +231,8 @@ func (s *LightEthereum) Protocols() []p2p.Protocol { // Start implements node.Service, starting all internal goroutines needed by the // Ethereum protocol implementation. func (s *LightEthereum) Start(srvr *p2p.Server) error { - s.startBloomHandlers() log.Warn("Light client mode is an experimental feature") + s.startBloomHandlers(params.BloomBitsBlocksClient) s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId) // clients are searching for the first advertised protocol in the list protocolVersion := AdvertiseProtocolVersions[0] diff --git a/les/bloombits.go b/les/bloombits.go index 2871a9006..aea0fcd5f 100644 --- a/les/bloombits.go +++ b/les/bloombits.go @@ -43,7 +43,7 @@ const ( // startBloomHandlers starts a batch of goroutines to accept bloom bit database // retrievals from possibly a range of filters and serving the data to satisfy. -func (eth *LightEthereum) startBloomHandlers() { +func (eth *LightEthereum) startBloomHandlers(sectionSize uint64) { for i := 0; i < bloomServiceThreads; i++ { go func() { for { @@ -57,7 +57,7 @@ func (eth *LightEthereum) startBloomHandlers() { compVectors, err := light.GetBloomBits(task.Context, eth.odr, task.Bit, task.Sections) if err == nil { for i := range task.Sections { - if blob, err := bitutil.DecompressBytes(compVectors[i], int(light.BloomTrieFrequency/8)); err == nil { + if blob, err := bitutil.DecompressBytes(compVectors[i], int(sectionSize/8)); err == nil { task.Bitsets[i] = blob } else { task.Error = err diff --git a/les/commons.go b/les/commons.go index d8e941295..a97687993 100644 --- a/les/commons.go +++ b/les/commons.go @@ -33,6 +33,7 @@ import ( // lesCommons contains fields needed by both server and client. type lesCommons struct { config *eth.Config + iConfig *light.IndexerConfig chainDb ethdb.Database protocolManager *ProtocolManager chtIndexer, bloomTrieIndexer *core.ChainIndexer @@ -81,7 +82,7 @@ func (c *lesCommons) nodeInfo() interface{} { if !c.protocolManager.lightSync { // convert to client section size if running in server mode - sections /= light.CHTFrequencyClient / light.CHTFrequencyServer + sections /= c.iConfig.PairChtSize / c.iConfig.ChtSize } if sections2 < sections { @@ -94,7 +95,8 @@ func (c *lesCommons) nodeInfo() interface{} { if c.protocolManager.lightSync { chtRoot = light.GetChtRoot(c.chainDb, sectionIndex, sectionHead) } else { - chtRoot = light.GetChtV2Root(c.chainDb, sectionIndex, sectionHead) + idxV2 := (sectionIndex+1)*c.iConfig.PairChtSize/c.iConfig.ChtSize - 1 + chtRoot = light.GetChtRoot(c.chainDb, idxV2, sectionHead) } cht = light.TrustedCheckpoint{ SectionIdx: sectionIndex, diff --git a/les/handler.go b/les/handler.go index ca40eaabf..243a6dabd 100644 --- a/les/handler.go +++ b/les/handler.go @@ -94,6 +94,7 @@ type ProtocolManager struct { txrelay *LesTxRelay networkId uint64 chainConfig *params.ChainConfig + iConfig *light.IndexerConfig blockchain BlockChain chainDb ethdb.Database odr *LesOdr @@ -123,13 +124,14 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { +func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ lightSync: lightSync, eventMux: mux, blockchain: blockchain, chainConfig: chainConfig, + iConfig: indexerConfig, chainDb: chainDb, odr: odr, networkId: networkId, @@ -882,7 +884,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix)) for _, req := range req.Reqs { if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil { - sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*light.CHTFrequencyServer-1) + sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*pm.iConfig.ChtSize-1) if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) { trie, err := trie.New(root, trieDb) if err != nil { @@ -1137,10 +1139,11 @@ func (pm *ProtocolManager) getAccount(statedb *state.StateDB, root, hash common. func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) { switch id { case htCanonical: - sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*light.CHTFrequencyClient-1) - return light.GetChtV2Root(pm.chainDb, idx, sectionHead), light.ChtTablePrefix + idxV1 := (idx+1)*(pm.iConfig.PairChtSize/pm.iConfig.ChtSize) - 1 + sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idxV1+1)*pm.iConfig.ChtSize-1) + return light.GetChtRoot(pm.chainDb, idxV1, sectionHead), light.ChtTablePrefix case htBloomBits: - sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*light.BloomTrieFrequency-1) + sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*pm.iConfig.BloomTrieSize-1) return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix } return common.Hash{}, "" diff --git a/les/handler_test.go b/les/handler_test.go index 31aad3ed4..43be7f41b 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -51,10 +51,9 @@ func TestGetBlockHeadersLes1(t *testing.T) { testGetBlockHeaders(t, 1) } func TestGetBlockHeadersLes2(t *testing.T) { testGetBlockHeaders(t, 2) } func testGetBlockHeaders(t *testing.T, protocol int) { - pm := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil, nil, nil, ethdb.NewMemDatabase()) - bc := pm.blockchain.(*core.BlockChain) - peer, _ := newTestPeer(t, "peer", protocol, pm, true) - defer peer.close() + server, tearDown := newServerEnv(t, downloader.MaxHashFetch+15, protocol, nil) + defer tearDown() + bc := server.pm.blockchain.(*core.BlockChain) // Create a "random" unknown hash for testing var unknown common.Hash @@ -167,9 +166,9 @@ func testGetBlockHeaders(t *testing.T, protocol int) { } // Send the hash request and verify the response reqID++ - cost := peer.GetRequestCost(GetBlockHeadersMsg, int(tt.query.Amount)) - sendRequest(peer.app, GetBlockHeadersMsg, reqID, cost, tt.query) - if err := expectResponse(peer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil { + cost := server.tPeer.GetRequestCost(GetBlockHeadersMsg, int(tt.query.Amount)) + sendRequest(server.tPeer.app, GetBlockHeadersMsg, reqID, cost, tt.query) + if err := expectResponse(server.tPeer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil { t.Errorf("test %d: headers mismatch: %v", i, err) } } @@ -180,10 +179,9 @@ func TestGetBlockBodiesLes1(t *testing.T) { testGetBlockBodies(t, 1) } func TestGetBlockBodiesLes2(t *testing.T) { testGetBlockBodies(t, 2) } func testGetBlockBodies(t *testing.T, protocol int) { - pm := newTestProtocolManagerMust(t, false, downloader.MaxBlockFetch+15, nil, nil, nil, ethdb.NewMemDatabase()) - bc := pm.blockchain.(*core.BlockChain) - peer, _ := newTestPeer(t, "peer", protocol, pm, true) - defer peer.close() + server, tearDown := newServerEnv(t, downloader.MaxBlockFetch+15, protocol, nil) + defer tearDown() + bc := server.pm.blockchain.(*core.BlockChain) // Create a batch of tests for various scenarios limit := MaxBodyFetch @@ -243,9 +241,9 @@ func testGetBlockBodies(t *testing.T, protocol int) { } reqID++ // Send the hash request and verify the response - cost := peer.GetRequestCost(GetBlockBodiesMsg, len(hashes)) - sendRequest(peer.app, GetBlockBodiesMsg, reqID, cost, hashes) - if err := expectResponse(peer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil { + cost := server.tPeer.GetRequestCost(GetBlockBodiesMsg, len(hashes)) + sendRequest(server.tPeer.app, GetBlockBodiesMsg, reqID, cost, hashes) + if err := expectResponse(server.tPeer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil { t.Errorf("test %d: bodies mismatch: %v", i, err) } } @@ -257,10 +255,9 @@ func TestGetCodeLes2(t *testing.T) { testGetCode(t, 2) } func testGetCode(t *testing.T, protocol int) { // Assemble the test environment - pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, ethdb.NewMemDatabase()) - bc := pm.blockchain.(*core.BlockChain) - peer, _ := newTestPeer(t, "peer", protocol, pm, true) - defer peer.close() + server, tearDown := newServerEnv(t, 4, protocol, nil) + defer tearDown() + bc := server.pm.blockchain.(*core.BlockChain) var codereqs []*CodeReq var codes [][]byte @@ -277,9 +274,9 @@ func testGetCode(t *testing.T, protocol int) { } } - cost := peer.GetRequestCost(GetCodeMsg, len(codereqs)) - sendRequest(peer.app, GetCodeMsg, 42, cost, codereqs) - if err := expectResponse(peer.app, CodeMsg, 42, testBufLimit, codes); err != nil { + cost := server.tPeer.GetRequestCost(GetCodeMsg, len(codereqs)) + sendRequest(server.tPeer.app, GetCodeMsg, 42, cost, codereqs) + if err := expectResponse(server.tPeer.app, CodeMsg, 42, testBufLimit, codes); err != nil { t.Errorf("codes mismatch: %v", err) } } @@ -290,11 +287,9 @@ func TestGetReceiptLes2(t *testing.T) { testGetReceipt(t, 2) } func testGetReceipt(t *testing.T, protocol int) { // Assemble the test environment - db := ethdb.NewMemDatabase() - pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) - bc := pm.blockchain.(*core.BlockChain) - peer, _ := newTestPeer(t, "peer", protocol, pm, true) - defer peer.close() + server, tearDown := newServerEnv(t, 4, protocol, nil) + defer tearDown() + bc := server.pm.blockchain.(*core.BlockChain) // Collect the hashes to request, and the response to expect hashes, receipts := []common.Hash{}, []types.Receipts{} @@ -302,12 +297,12 @@ func testGetReceipt(t *testing.T, protocol int) { block := bc.GetBlockByNumber(i) hashes = append(hashes, block.Hash()) - receipts = append(receipts, rawdb.ReadReceipts(db, block.Hash(), block.NumberU64())) + receipts = append(receipts, rawdb.ReadReceipts(server.db, block.Hash(), block.NumberU64())) } // Send the hash request and verify the response - cost := peer.GetRequestCost(GetReceiptsMsg, len(hashes)) - sendRequest(peer.app, GetReceiptsMsg, 42, cost, hashes) - if err := expectResponse(peer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil { + cost := server.tPeer.GetRequestCost(GetReceiptsMsg, len(hashes)) + sendRequest(server.tPeer.app, GetReceiptsMsg, 42, cost, hashes) + if err := expectResponse(server.tPeer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil { t.Errorf("receipts mismatch: %v", err) } } @@ -318,11 +313,9 @@ func TestGetProofsLes2(t *testing.T) { testGetProofs(t, 2) } func testGetProofs(t *testing.T, protocol int) { // Assemble the test environment - db := ethdb.NewMemDatabase() - pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) - bc := pm.blockchain.(*core.BlockChain) - peer, _ := newTestPeer(t, "peer", protocol, pm, true) - defer peer.close() + server, tearDown := newServerEnv(t, 4, protocol, nil) + defer tearDown() + bc := server.pm.blockchain.(*core.BlockChain) var ( proofreqs []ProofReq @@ -334,7 +327,7 @@ func testGetProofs(t *testing.T, protocol int) { for i := uint64(0); i <= bc.CurrentBlock().NumberU64(); i++ { header := bc.GetHeaderByNumber(i) root := header.Root - trie, _ := trie.New(root, trie.NewDatabase(db)) + trie, _ := trie.New(root, trie.NewDatabase(server.db)) for _, acc := range accounts { req := ProofReq{ @@ -356,15 +349,15 @@ func testGetProofs(t *testing.T, protocol int) { // Send the proof request and verify the response switch protocol { case 1: - cost := peer.GetRequestCost(GetProofsV1Msg, len(proofreqs)) - sendRequest(peer.app, GetProofsV1Msg, 42, cost, proofreqs) - if err := expectResponse(peer.app, ProofsV1Msg, 42, testBufLimit, proofsV1); err != nil { + cost := server.tPeer.GetRequestCost(GetProofsV1Msg, len(proofreqs)) + sendRequest(server.tPeer.app, GetProofsV1Msg, 42, cost, proofreqs) + if err := expectResponse(server.tPeer.app, ProofsV1Msg, 42, testBufLimit, proofsV1); err != nil { t.Errorf("proofs mismatch: %v", err) } case 2: - cost := peer.GetRequestCost(GetProofsV2Msg, len(proofreqs)) - sendRequest(peer.app, GetProofsV2Msg, 42, cost, proofreqs) - if err := expectResponse(peer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil { + cost := server.tPeer.GetRequestCost(GetProofsV2Msg, len(proofreqs)) + sendRequest(server.tPeer.app, GetProofsV2Msg, 42, cost, proofreqs) + if err := expectResponse(server.tPeer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil { t.Errorf("proofs mismatch: %v", err) } } @@ -375,28 +368,33 @@ func TestGetCHTProofsLes1(t *testing.T) { testGetCHTProofs(t, 1) } func TestGetCHTProofsLes2(t *testing.T) { testGetCHTProofs(t, 2) } func testGetCHTProofs(t *testing.T, protocol int) { - // Figure out the client's CHT frequency - frequency := uint64(light.CHTFrequencyClient) - if protocol == 1 { - frequency = uint64(light.CHTFrequencyServer) + config := light.TestServerIndexerConfig + frequency := config.ChtSize + if protocol == 2 { + frequency = config.PairChtSize } - // Assemble the test environment - db := ethdb.NewMemDatabase() - pm := newTestProtocolManagerMust(t, false, int(frequency)+light.HelperTrieProcessConfirmations, testChainGen, nil, nil, db) - bc := pm.blockchain.(*core.BlockChain) - peer, _ := newTestPeer(t, "peer", protocol, pm, true) - defer peer.close() - // Wait a while for the CHT indexer to process the new headers - time.Sleep(100 * time.Millisecond * time.Duration(frequency/light.CHTFrequencyServer)) // Chain indexer throttling - time.Sleep(250 * time.Millisecond) // CI tester slack + waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { + expectSections := frequency / config.ChtSize + for { + cs, _, _ := cIndexer.Sections() + bs, _, _ := bIndexer.Sections() + if cs >= expectSections && bs >= expectSections { + break + } + time.Sleep(10 * time.Millisecond) + } + } + server, tearDown := newServerEnv(t, int(frequency+config.ChtConfirms), protocol, waitIndexers) + defer tearDown() + bc := server.pm.blockchain.(*core.BlockChain) // Assemble the proofs from the different protocols - header := bc.GetHeaderByNumber(frequency) + header := bc.GetHeaderByNumber(frequency - 1) rlp, _ := rlp.EncodeToBytes(header) key := make([]byte, 8) - binary.BigEndian.PutUint64(key, frequency) + binary.BigEndian.PutUint64(key, frequency-1) proofsV1 := []ChtResp{{ Header: header, @@ -406,41 +404,41 @@ func testGetCHTProofs(t *testing.T, protocol int) { } switch protocol { case 1: - root := light.GetChtRoot(db, 0, bc.GetHeaderByNumber(frequency-1).Hash()) - trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(db, light.ChtTablePrefix))) + root := light.GetChtRoot(server.db, 0, bc.GetHeaderByNumber(frequency-1).Hash()) + trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.ChtTablePrefix))) var proof light.NodeList trie.Prove(key, 0, &proof) proofsV1[0].Proof = proof case 2: - root := light.GetChtV2Root(db, 0, bc.GetHeaderByNumber(frequency-1).Hash()) - trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(db, light.ChtTablePrefix))) + root := light.GetChtRoot(server.db, (frequency/config.ChtSize)-1, bc.GetHeaderByNumber(frequency-1).Hash()) + trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.ChtTablePrefix))) trie.Prove(key, 0, &proofsV2.Proofs) } // Assemble the requests for the different protocols requestsV1 := []ChtReq{{ - ChtNum: 1, - BlockNum: frequency, + ChtNum: frequency / config.ChtSize, + BlockNum: frequency - 1, }} requestsV2 := []HelperTrieReq{{ Type: htCanonical, - TrieIdx: 0, + TrieIdx: frequency/config.PairChtSize - 1, Key: key, AuxReq: auxHeader, }} // Send the proof request and verify the response switch protocol { case 1: - cost := peer.GetRequestCost(GetHeaderProofsMsg, len(requestsV1)) - sendRequest(peer.app, GetHeaderProofsMsg, 42, cost, requestsV1) - if err := expectResponse(peer.app, HeaderProofsMsg, 42, testBufLimit, proofsV1); err != nil { + cost := server.tPeer.GetRequestCost(GetHeaderProofsMsg, len(requestsV1)) + sendRequest(server.tPeer.app, GetHeaderProofsMsg, 42, cost, requestsV1) + if err := expectResponse(server.tPeer.app, HeaderProofsMsg, 42, testBufLimit, proofsV1); err != nil { t.Errorf("proofs mismatch: %v", err) } case 2: - cost := peer.GetRequestCost(GetHelperTrieProofsMsg, len(requestsV2)) - sendRequest(peer.app, GetHelperTrieProofsMsg, 42, cost, requestsV2) - if err := expectResponse(peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil { + cost := server.tPeer.GetRequestCost(GetHelperTrieProofsMsg, len(requestsV2)) + sendRequest(server.tPeer.app, GetHelperTrieProofsMsg, 42, cost, requestsV2) + if err := expectResponse(server.tPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil { t.Errorf("proofs mismatch: %v", err) } } @@ -448,24 +446,31 @@ func testGetCHTProofs(t *testing.T, protocol int) { // Tests that bloombits proofs can be correctly retrieved. func TestGetBloombitsProofs(t *testing.T) { - // Assemble the test environment - db := ethdb.NewMemDatabase() - pm := newTestProtocolManagerMust(t, false, light.BloomTrieFrequency+256, testChainGen, nil, nil, db) - bc := pm.blockchain.(*core.BlockChain) - peer, _ := newTestPeer(t, "peer", 2, pm, true) - defer peer.close() + config := light.TestServerIndexerConfig - // Wait a while for the bloombits indexer to process the new headers - time.Sleep(100 * time.Millisecond * time.Duration(light.BloomTrieFrequency/4096)) // Chain indexer throttling - time.Sleep(250 * time.Millisecond) // CI tester slack + waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { + for { + cs, _, _ := cIndexer.Sections() + bs, _, _ := bIndexer.Sections() + bts, _, _ := btIndexer.Sections() + if cs >= 8 && bs >= 8 && bts >= 1 { + break + } + time.Sleep(10 * time.Millisecond) + } + } + server, tearDown := newServerEnv(t, int(config.BloomTrieSize+config.BloomTrieConfirms), 2, waitIndexers) + defer tearDown() + bc := server.pm.blockchain.(*core.BlockChain) // Request and verify each bit of the bloom bits proofs for bit := 0; bit < 2048; bit++ { - // Assemble therequest and proofs for the bloombits + // Assemble the request and proofs for the bloombits key := make([]byte, 10) binary.BigEndian.PutUint16(key[:2], uint16(bit)) - binary.BigEndian.PutUint64(key[2:], uint64(light.BloomTrieFrequency)) + // Only the first bloom section has data. + binary.BigEndian.PutUint64(key[2:], 0) requests := []HelperTrieReq{{ Type: htBloomBits, @@ -474,14 +479,14 @@ func TestGetBloombitsProofs(t *testing.T) { }} var proofs HelperTrieResps - root := light.GetBloomTrieRoot(db, 0, bc.GetHeaderByNumber(light.BloomTrieFrequency-1).Hash()) - trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(db, light.BloomTrieTablePrefix))) + root := light.GetBloomTrieRoot(server.db, 0, bc.GetHeaderByNumber(config.BloomTrieSize-1).Hash()) + trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.BloomTrieTablePrefix))) trie.Prove(key, 0, &proofs.Proofs) // Send the proof request and verify the response - cost := peer.GetRequestCost(GetHelperTrieProofsMsg, len(requests)) - sendRequest(peer.app, GetHelperTrieProofsMsg, 42, cost, requests) - if err := expectResponse(peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil { + cost := server.tPeer.GetRequestCost(GetHelperTrieProofsMsg, len(requests)) + sendRequest(server.tPeer.app, GetHelperTrieProofsMsg, 42, cost, requests) + if err := expectResponse(server.tPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil { t.Errorf("bit %d: proofs mismatch: %v", bit, err) } } diff --git a/les/helper_test.go b/les/helper_test.go index 8817c20c7..206ee2d92 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -24,6 +24,7 @@ import ( "math/big" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -123,6 +124,15 @@ func testChainGen(i int, block *core.BlockGen) { } } +// testIndexers creates a set of indexers with specified params for testing purpose. +func testIndexers(db ethdb.Database, odr light.OdrBackend, iConfig *light.IndexerConfig) (*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer) { + chtIndexer := light.NewChtIndexer(db, odr, iConfig.ChtSize, iConfig.ChtConfirms) + bloomIndexer := eth.NewBloomIndexer(db, iConfig.BloomSize, iConfig.BloomConfirms) + bloomTrieIndexer := light.NewBloomTrieIndexer(db, odr, iConfig.BloomSize, iConfig.BloomTrieSize) + bloomIndexer.AddChildIndexer(bloomTrieIndexer) + return chtIndexer, bloomIndexer, bloomTrieIndexer +} + func testRCL() RequestCostList { cl := make(RequestCostList, len(reqList)) for i, code := range reqList { @@ -134,9 +144,9 @@ func testRCL() RequestCostList { } // newTestProtocolManager creates a new protocol manager for testing purposes, -// with the given number of blocks already known, and potential notification -// channels for different events. -func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) (*ProtocolManager, error) { +// with the given number of blocks already known, potential notification +// channels for different events and relative chain indexers array. +func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) (*ProtocolManager, error) { var ( evmux = new(event.TypeMux) engine = ethash.NewFaker() @@ -155,16 +165,6 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor chain, _ = light.NewLightChain(odr, gspec.Config, engine) } else { blockchain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) - - chtIndexer := light.NewChtIndexer(db, false, nil) - chtIndexer.Start(blockchain) - - bbtIndexer := light.NewBloomTrieIndexer(db, false, nil) - - bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks, light.HelperTrieProcessConfirmations) - bloomIndexer.AddChildIndexer(bbtIndexer) - bloomIndexer.Start(blockchain) - gchain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, blocks, generator) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) @@ -172,7 +172,11 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor chain = blockchain } - pm, err := NewProtocolManager(gspec.Config, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup)) + indexConfig := light.TestServerIndexerConfig + if lightSync { + indexConfig = light.TestClientIndexerConfig + } + pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup)) if err != nil { return nil, err } @@ -193,11 +197,11 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor } // newTestProtocolManagerMust creates a new protocol manager for testing purposes, -// with the given number of blocks already known, and potential notification -// channels for different events. In case of an error, the constructor force- +// with the given number of blocks already known, potential notification +// channels for different events and relative chain indexers array. In case of an error, the constructor force- // fails the test. -func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) *ProtocolManager { - pm, err := newTestProtocolManager(lightSync, blocks, generator, peers, odr, db) +func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) *ProtocolManager { + pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db) if err != nil { t.Fatalf("Failed to create protocol manager: %v", err) } @@ -320,3 +324,122 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu func (p *testPeer) close() { p.app.Close() } + +// TestEntity represents a network entity for testing with necessary auxiliary fields. +type TestEntity struct { + db ethdb.Database + rPeer *peer + tPeer *testPeer + peers *peerSet + pm *ProtocolManager + // Indexers + chtIndexer *core.ChainIndexer + bloomIndexer *core.ChainIndexer + bloomTrieIndexer *core.ChainIndexer +} + +// newServerEnv creates a server testing environment with a connected test peer for testing purpose. +func newServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer)) (*TestEntity, func()) { + db := ethdb.NewMemDatabase() + cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig) + + pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, nil, db) + peer, _ := newTestPeer(t, "peer", protocol, pm, true) + + cIndexer.Start(pm.blockchain.(*core.BlockChain)) + bIndexer.Start(pm.blockchain.(*core.BlockChain)) + + // Wait until indexers generate enough index data. + if waitIndexers != nil { + waitIndexers(cIndexer, bIndexer, btIndexer) + } + + return &TestEntity{ + db: db, + tPeer: peer, + pm: pm, + chtIndexer: cIndexer, + bloomIndexer: bIndexer, + bloomTrieIndexer: btIndexer, + }, func() { + peer.close() + // Note bloom trie indexer will be closed by it parent recursively. + cIndexer.Close() + bIndexer.Close() + } +} + +// newClientServerEnv creates a client/server arch environment with a connected les server and light client pair +// for testing purpose. +func newClientServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer), newPeer bool) (*TestEntity, *TestEntity, func()) { + db, ldb := ethdb.NewMemDatabase(), ethdb.NewMemDatabase() + peers, lPeers := newPeerSet(), newPeerSet() + + dist := newRequestDistributor(lPeers, make(chan struct{})) + rm := newRetrieveManager(lPeers, dist, nil) + odr := NewLesOdr(ldb, light.TestClientIndexerConfig, rm) + + cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig) + lcIndexer, lbIndexer, lbtIndexer := testIndexers(ldb, odr, light.TestClientIndexerConfig) + odr.SetIndexers(lcIndexer, lbtIndexer, lbIndexer) + + pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, peers, db) + lpm := newTestProtocolManagerMust(t, true, 0, nil, odr, lPeers, ldb) + + startIndexers := func(clientMode bool, pm *ProtocolManager) { + if clientMode { + lcIndexer.Start(pm.blockchain.(*light.LightChain)) + lbIndexer.Start(pm.blockchain.(*light.LightChain)) + } else { + cIndexer.Start(pm.blockchain.(*core.BlockChain)) + bIndexer.Start(pm.blockchain.(*core.BlockChain)) + } + } + + startIndexers(false, pm) + startIndexers(true, lpm) + + // Execute wait until function if it is specified. + if waitIndexers != nil { + waitIndexers(cIndexer, bIndexer, btIndexer) + } + + var ( + peer, lPeer *peer + err1, err2 <-chan error + ) + if newPeer { + peer, err1, lPeer, err2 = newTestPeerPair("peer", protocol, pm, lpm) + select { + case <-time.After(time.Millisecond * 100): + case err := <-err1: + t.Fatalf("peer 1 handshake error: %v", err) + case err := <-err2: + t.Fatalf("peer 2 handshake error: %v", err) + } + } + + return &TestEntity{ + db: db, + pm: pm, + rPeer: peer, + peers: peers, + chtIndexer: cIndexer, + bloomIndexer: bIndexer, + bloomTrieIndexer: btIndexer, + }, &TestEntity{ + db: ldb, + pm: lpm, + rPeer: lPeer, + peers: lPeers, + chtIndexer: lcIndexer, + bloomIndexer: lbIndexer, + bloomTrieIndexer: lbtIndexer, + }, func() { + // Note bloom trie indexers will be closed by their parents recursively. + cIndexer.Close() + bIndexer.Close() + lcIndexer.Close() + lbIndexer.Close() + } +} diff --git a/les/odr.go b/les/odr.go index 2ad28d5d9..9def05a67 100644 --- a/les/odr.go +++ b/les/odr.go @@ -28,16 +28,18 @@ import ( // LesOdr implements light.OdrBackend type LesOdr struct { db ethdb.Database + indexerConfig *light.IndexerConfig chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer retriever *retrieveManager stop chan struct{} } -func NewLesOdr(db ethdb.Database, retriever *retrieveManager) *LesOdr { +func NewLesOdr(db ethdb.Database, config *light.IndexerConfig, retriever *retrieveManager) *LesOdr { return &LesOdr{ - db: db, - retriever: retriever, - stop: make(chan struct{}), + db: db, + indexerConfig: config, + retriever: retriever, + stop: make(chan struct{}), } } @@ -73,6 +75,11 @@ func (odr *LesOdr) BloomIndexer() *core.ChainIndexer { return odr.bloomIndexer } +// IndexerConfig returns the indexer config. +func (odr *LesOdr) IndexerConfig() *light.IndexerConfig { + return odr.indexerConfig +} + const ( MsgBlockBodies = iota MsgCode diff --git a/les/odr_requests.go b/les/odr_requests.go index 075fcd92c..9e9b2673f 100644 --- a/les/odr_requests.go +++ b/les/odr_requests.go @@ -365,7 +365,7 @@ func (r *ChtRequest) CanSend(peer *peer) bool { peer.lock.RLock() defer peer.lock.RUnlock() - return peer.headInfo.Number >= light.HelperTrieConfirmations && r.ChtNum <= (peer.headInfo.Number-light.HelperTrieConfirmations)/light.CHTFrequencyClient + return peer.headInfo.Number >= r.Config.ChtConfirms && r.ChtNum <= (peer.headInfo.Number-r.Config.ChtConfirms)/r.Config.ChtSize } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) @@ -379,7 +379,21 @@ func (r *ChtRequest) Request(reqID uint64, peer *peer) error { Key: encNum[:], AuxReq: auxHeader, } - return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []HelperTrieReq{req}) + switch peer.version { + case lpv1: + var reqsV1 ChtReq + if req.Type != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 { + return fmt.Errorf("Request invalid in LES/1 mode") + } + blockNum := binary.BigEndian.Uint64(req.Key) + // convert HelperTrie request to old CHT request + reqsV1 = ChtReq{ChtNum: (req.TrieIdx + 1) * (r.Config.ChtSize / r.Config.PairChtSize), BlockNum: blockNum, FromLevel: req.FromLevel} + return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []ChtReq{reqsV1}) + case lpv2: + return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []HelperTrieReq{req}) + default: + panic(nil) + } } // Valid processes an ODR request reply message from the LES network @@ -484,7 +498,7 @@ func (r *BloomRequest) CanSend(peer *peer) bool { if peer.version < lpv2 { return false } - return peer.headInfo.Number >= light.HelperTrieConfirmations && r.BloomTrieNum <= (peer.headInfo.Number-light.HelperTrieConfirmations)/light.BloomTrieFrequency + return peer.headInfo.Number >= r.Config.BloomTrieConfirms && r.BloomTrieNum <= (peer.headInfo.Number-r.Config.BloomTrieConfirms)/r.Config.BloomTrieSize } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) diff --git a/les/odr_test.go b/les/odr_test.go index c7c25cbe4..e6458adf5 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/params" @@ -160,36 +159,21 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai return res } +// testOdr tests odr requests whose validation guaranteed by block headers. func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { // Assemble the test environment - peers := newPeerSet() - dist := newRequestDistributor(peers, make(chan struct{})) - rm := newRetrieveManager(peers, dist, nil) - db := ethdb.NewMemDatabase() - ldb := ethdb.NewMemDatabase() - odr := NewLesOdr(ldb, rm) - odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations)) - pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) - lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) - _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) - select { - case <-time.After(time.Millisecond * 100): - case err := <-err1: - t.Fatalf("peer 1 handshake error: %v", err) - case err := <-err2: - t.Fatalf("peer 1 handshake error: %v", err) - } - - lpm.synchronise(lpeer) + server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true) + defer tearDown() + client.pm.synchronise(client.rPeer) test := func(expFail uint64) { - for i := uint64(0); i <= pm.blockchain.CurrentHeader().Number.Uint64(); i++ { - bhash := rawdb.ReadCanonicalHash(db, i) - b1 := fn(light.NoOdr, db, pm.chainConfig, pm.blockchain.(*core.BlockChain), nil, bhash) + for i := uint64(0); i <= server.pm.blockchain.CurrentHeader().Number.Uint64(); i++ { + bhash := rawdb.ReadCanonicalHash(server.db, i) + b1 := fn(light.NoOdr, server.db, server.pm.chainConfig, server.pm.blockchain.(*core.BlockChain), nil, bhash) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - b2 := fn(ctx, ldb, lpm.chainConfig, nil, lpm.blockchain.(*light.LightChain), bhash) + b2 := fn(ctx, client.db, client.pm.chainConfig, nil, client.pm.blockchain.(*light.LightChain), bhash) eq := bytes.Equal(b1, b2) exp := i < expFail @@ -201,21 +185,20 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { } } } - // temporarily remove peer to test odr fails // expect retrievals to fail (except genesis block) without a les peer - peers.Unregister(lpeer.id) + client.peers.Unregister(client.rPeer.id) time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed test(expFail) // expect all retrievals to pass - peers.Register(lpeer) + client.peers.Register(client.rPeer) time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed - lpeer.lock.Lock() - lpeer.hasBlock = func(common.Hash, uint64) bool { return true } - lpeer.lock.Unlock() + client.peers.lock.Lock() + client.rPeer.hasBlock = func(common.Hash, uint64) bool { return true } + client.peers.lock.Unlock() test(5) // still expect all retrievals to pass, now data should be cached locally - peers.Unregister(lpeer.id) + client.peers.Unregister(client.rPeer.id) time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed test(5) } diff --git a/les/peer.go b/les/peer.go index eb7452e27..70c863c2f 100644 --- a/les/peer.go +++ b/les/peer.go @@ -19,7 +19,6 @@ package les import ( "crypto/ecdsa" - "encoding/binary" "errors" "fmt" "math/big" @@ -36,9 +35,10 @@ import ( ) var ( - errClosed = errors.New("peer set is closed") - errAlreadyRegistered = errors.New("peer is already registered") - errNotRegistered = errors.New("peer is not registered") + errClosed = errors.New("peer set is closed") + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") + errInvalidHelpTrieReq = errors.New("invalid help trie request") ) const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam) @@ -284,21 +284,21 @@ func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error { } // RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node. -func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, reqs []HelperTrieReq) error { - p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs)) +func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, data interface{}) error { switch p.version { case lpv1: - reqsV1 := make([]ChtReq, len(reqs)) - for i, req := range reqs { - if req.Type != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 { - return fmt.Errorf("Request invalid in LES/1 mode") - } - blockNum := binary.BigEndian.Uint64(req.Key) - // convert HelperTrie request to old CHT request - reqsV1[i] = ChtReq{ChtNum: (req.TrieIdx + 1) * (light.CHTFrequencyClient / light.CHTFrequencyServer), BlockNum: blockNum, FromLevel: req.FromLevel} + reqs, ok := data.([]ChtReq) + if !ok { + return errInvalidHelpTrieReq } - return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqsV1) + p.Log().Debug("Fetching batch of header proofs", "count", len(reqs)) + return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs) case lpv2: + reqs, ok := data.([]HelperTrieReq) + if !ok { + return errInvalidHelpTrieReq + } + p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs)) return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs) default: panic(nil) diff --git a/les/request_test.go b/les/request_test.go index db576798b..f02c2a3d7 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -24,7 +24,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/light" ) @@ -84,35 +83,17 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, num uint64) light.OdrReq func testAccess(t *testing.T, protocol int, fn accessTestFn) { // Assemble the test environment - peers := newPeerSet() - dist := newRequestDistributor(peers, make(chan struct{})) - rm := newRetrieveManager(peers, dist, nil) - db := ethdb.NewMemDatabase() - ldb := ethdb.NewMemDatabase() - odr := NewLesOdr(ldb, rm) - odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations)) - - pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) - lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) - _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) - select { - case <-time.After(time.Millisecond * 100): - case err := <-err1: - t.Fatalf("peer 1 handshake error: %v", err) - case err := <-err2: - t.Fatalf("peer 1 handshake error: %v", err) - } - - lpm.synchronise(lpeer) + server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true) + defer tearDown() + client.pm.synchronise(client.rPeer) test := func(expFail uint64) { - for i := uint64(0); i <= pm.blockchain.CurrentHeader().Number.Uint64(); i++ { - bhash := rawdb.ReadCanonicalHash(db, i) - if req := fn(ldb, bhash, i); req != nil { + for i := uint64(0); i <= server.pm.blockchain.CurrentHeader().Number.Uint64(); i++ { + bhash := rawdb.ReadCanonicalHash(server.db, i) + if req := fn(client.db, bhash, i); req != nil { ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - - err := odr.Retrieve(ctx, req) + err := client.pm.odr.Retrieve(ctx, req) got := err == nil exp := i < expFail if exp && !got { @@ -126,16 +107,16 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { } // temporarily remove peer to test odr fails - peers.Unregister(lpeer.id) + client.peers.Unregister(client.rPeer.id) time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed // expect retrievals to fail (except genesis block) without a les peer test(0) - peers.Register(lpeer) + client.peers.Register(client.rPeer) time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed - lpeer.lock.Lock() - lpeer.hasBlock = func(common.Hash, uint64) bool { return true } - lpeer.lock.Unlock() + client.rPeer.lock.Lock() + client.rPeer.hasBlock = func(common.Hash, uint64) bool { return true } + client.rPeer.lock.Unlock() // expect all retrievals to pass test(5) } diff --git a/les/server.go b/les/server.go index df98d1e3a..2fa0456d6 100644 --- a/les/server.go +++ b/les/server.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -50,7 +51,7 @@ type LesServer struct { func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { quitSync := make(chan struct{}) - pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup)) + pm, err := NewProtocolManager(eth.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup)) if err != nil { return nil, err } @@ -64,8 +65,9 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { lesCommons: lesCommons{ config: config, chainDb: eth.ChainDb(), - chtIndexer: light.NewChtIndexer(eth.ChainDb(), false, nil), - bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false, nil), + iConfig: light.DefaultServerIndexerConfig, + chtIndexer: light.NewChtIndexer(eth.ChainDb(), nil, params.CHTFrequencyServer, params.HelperTrieProcessConfirmations), + bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency), protocolManager: pm, }, quitSync: quitSync, @@ -75,14 +77,14 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { logger := log.New() chtV1SectionCount, _, _ := srv.chtIndexer.Sections() // indexer still uses LES/1 4k section size for backwards server compatibility - chtV2SectionCount := chtV1SectionCount / (light.CHTFrequencyClient / light.CHTFrequencyServer) + chtV2SectionCount := chtV1SectionCount / (params.CHTFrequencyClient / params.CHTFrequencyServer) if chtV2SectionCount != 0 { // convert to LES/2 section chtLastSection := chtV2SectionCount - 1 // convert last LES/2 section index back to LES/1 index for chtIndexer.SectionHead - chtLastSectionV1 := (chtLastSection+1)*(light.CHTFrequencyClient/light.CHTFrequencyServer) - 1 + chtLastSectionV1 := (chtLastSection+1)*(params.CHTFrequencyClient/params.CHTFrequencyServer) - 1 chtSectionHead := srv.chtIndexer.SectionHead(chtLastSectionV1) - chtRoot := light.GetChtV2Root(pm.chainDb, chtLastSection, chtSectionHead) + chtRoot := light.GetChtRoot(pm.chainDb, chtLastSectionV1, chtSectionHead) logger.Info("Loaded CHT", "section", chtLastSection, "head", chtSectionHead, "root", chtRoot) } bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections() diff --git a/light/lightchain.go b/light/lightchain.go index b5afe1f0e..bd798eca2 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -48,6 +48,7 @@ var ( // interface. It only does header validation during chain insertion. type LightChain struct { hc *core.HeaderChain + indexerConfig *IndexerConfig chainDb ethdb.Database odr OdrBackend chainFeed event.Feed @@ -81,13 +82,14 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus. blockCache, _ := lru.New(blockCacheLimit) bc := &LightChain{ - chainDb: odr.Database(), - odr: odr, - quit: make(chan struct{}), - bodyCache: bodyCache, - bodyRLPCache: bodyRLPCache, - blockCache: blockCache, - engine: engine, + chainDb: odr.Database(), + indexerConfig: odr.IndexerConfig(), + odr: odr, + quit: make(chan struct{}), + bodyCache: bodyCache, + bodyRLPCache: bodyRLPCache, + blockCache: blockCache, + engine: engine, } var err error bc.hc, err = core.NewHeaderChain(odr.Database(), config, bc.engine, bc.getProcInterrupt) @@ -128,7 +130,7 @@ func (self *LightChain) addTrustedCheckpoint(cp TrustedCheckpoint) { if self.odr.BloomIndexer() != nil { self.odr.BloomIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } - log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*CHTFrequencyClient-1, "hash", cp.SectionHead) + log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*self.indexerConfig.ChtSize-1, "hash", cp.SectionHead) } func (self *LightChain) getProcInterrupt() bool { @@ -472,7 +474,7 @@ func (self *LightChain) SyncCht(ctx context.Context) bool { head := self.CurrentHeader().Number.Uint64() sections, _, _ := self.odr.ChtIndexer().Sections() - latest := sections*CHTFrequencyClient - 1 + latest := sections*self.indexerConfig.ChtSize - 1 if clique := self.hc.Config().Clique; clique != nil { latest -= latest % clique.Epoch // epoch snapshot for clique } diff --git a/light/lightchain_test.go b/light/lightchain_test.go index 5f0baaf4c..d45c0656d 100644 --- a/light/lightchain_test.go +++ b/light/lightchain_test.go @@ -55,7 +55,7 @@ func newCanonical(n int) (ethdb.Database, *LightChain, error) { db := ethdb.NewMemDatabase() gspec := core.Genesis{Config: params.TestChainConfig} genesis := gspec.MustCommit(db) - blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker()) + blockchain, _ := NewLightChain(&dummyOdr{db: db, indexerConfig: TestClientIndexerConfig}, gspec.Config, ethash.NewFaker()) // Create and inject the requested chain if n == 0 { @@ -265,7 +265,8 @@ func makeHeaderChainWithDiff(genesis *types.Block, d []int, seed byte) []*types. type dummyOdr struct { OdrBackend - db ethdb.Database + db ethdb.Database + indexerConfig *IndexerConfig } func (odr *dummyOdr) Database() ethdb.Database { @@ -276,6 +277,10 @@ func (odr *dummyOdr) Retrieve(ctx context.Context, req OdrRequest) error { return nil } +func (odr *dummyOdr) IndexerConfig() *IndexerConfig { + return odr.indexerConfig +} + // Tests that reorganizing a long difficult chain after a short easy one // overwrites the canonical numbers and links in the database. func TestReorgLongHeaders(t *testing.T) { diff --git a/light/odr.go b/light/odr.go index 83c64055a..3cd8b2c04 100644 --- a/light/odr.go +++ b/light/odr.go @@ -44,6 +44,7 @@ type OdrBackend interface { BloomTrieIndexer() *core.ChainIndexer BloomIndexer() *core.ChainIndexer Retrieve(ctx context.Context, req OdrRequest) error + IndexerConfig() *IndexerConfig } // OdrRequest is an interface for retrieval requests @@ -136,6 +137,7 @@ func (req *ReceiptsRequest) StoreResult(db ethdb.Database) { // ChtRequest is the ODR request type for state/storage trie entries type ChtRequest struct { OdrRequest + Config *IndexerConfig ChtNum, BlockNum uint64 ChtRoot common.Hash Header *types.Header @@ -155,6 +157,7 @@ func (req *ChtRequest) StoreResult(db ethdb.Database) { // BloomRequest is the ODR request type for retrieving bloom filters from a CHT structure type BloomRequest struct { OdrRequest + Config *IndexerConfig BloomTrieNum uint64 BitIdx uint SectionIdxList []uint64 @@ -166,7 +169,7 @@ type BloomRequest struct { // StoreResult stores the retrieved data in local database func (req *BloomRequest) StoreResult(db ethdb.Database) { for i, sectionIdx := range req.SectionIdxList { - sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*BloomTrieFrequency-1) + sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*req.Config.BloomTrieSize-1) // if we don't have the canonical hash stored for this section head number, we'll still store it under // a key with a zero sectionHead. GetBloomBits will look there too if we still don't have the canonical // hash. In the unlikely case we've retrieved the section head hash since then, we'll just retrieve the diff --git a/light/odr_test.go b/light/odr_test.go index 3e7ac1011..eea5b1eab 100644 --- a/light/odr_test.go +++ b/light/odr_test.go @@ -55,8 +55,9 @@ var ( type testOdr struct { OdrBackend - sdb, ldb ethdb.Database - disable bool + indexerConfig *IndexerConfig + sdb, ldb ethdb.Database + disable bool } func (odr *testOdr) Database() ethdb.Database { @@ -92,6 +93,10 @@ func (odr *testOdr) Retrieve(ctx context.Context, req OdrRequest) error { return nil } +func (odr *testOdr) IndexerConfig() *IndexerConfig { + return odr.indexerConfig +} + type odrTestFn func(ctx context.Context, db ethdb.Database, bc *core.BlockChain, lc *LightChain, bhash common.Hash) ([]byte, error) func TestOdrGetBlockLes1(t *testing.T) { testChainOdr(t, 1, odrGetBlock) } @@ -258,7 +263,7 @@ func testChainOdr(t *testing.T, protocol int, fn odrTestFn) { t.Fatal(err) } - odr := &testOdr{sdb: sdb, ldb: ldb} + odr := &testOdr{sdb: sdb, ldb: ldb, indexerConfig: TestClientIndexerConfig} lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker()) if err != nil { t.Fatal(err) diff --git a/light/odr_util.go b/light/odr_util.go index 620af6383..9bc0f604b 100644 --- a/light/odr_util.go +++ b/light/odr_util.go @@ -53,16 +53,16 @@ func GetHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64) (*typ for chtCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) { chtCount-- if chtCount > 0 { - sectionHeadNum = chtCount*CHTFrequencyClient - 1 + sectionHeadNum = chtCount*odr.IndexerConfig().ChtSize - 1 sectionHead = odr.ChtIndexer().SectionHead(chtCount - 1) canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum) } } } - if number >= chtCount*CHTFrequencyClient { + if number >= chtCount*odr.IndexerConfig().ChtSize { return nil, ErrNoTrustedCht } - r := &ChtRequest{ChtRoot: GetChtRoot(db, chtCount-1, sectionHead), ChtNum: chtCount - 1, BlockNum: number} + r := &ChtRequest{ChtRoot: GetChtRoot(db, chtCount-1, sectionHead), ChtNum: chtCount - 1, BlockNum: number, Config: odr.IndexerConfig()} if err := odr.Retrieve(ctx, r); err != nil { return nil, err } @@ -175,9 +175,9 @@ func GetBlockLogs(ctx context.Context, odr OdrBackend, hash common.Hash, number // GetBloomBits retrieves a batch of compressed bloomBits vectors belonging to the given bit index and section indexes func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxList []uint64) ([][]byte, error) { - db := odr.Database() - result := make([][]byte, len(sectionIdxList)) var ( + db = odr.Database() + result = make([][]byte, len(sectionIdxList)) reqList []uint64 reqIdx []int ) @@ -193,7 +193,7 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi for bloomTrieCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) { bloomTrieCount-- if bloomTrieCount > 0 { - sectionHeadNum = bloomTrieCount*BloomTrieFrequency - 1 + sectionHeadNum = bloomTrieCount*odr.IndexerConfig().BloomTrieSize - 1 sectionHead = odr.BloomTrieIndexer().SectionHead(bloomTrieCount - 1) canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum) } @@ -201,7 +201,7 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi } for i, sectionIdx := range sectionIdxList { - sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*BloomTrieFrequency-1) + sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*odr.IndexerConfig().BloomSize-1) // if we don't have the canonical hash stored for this section head number, we'll still look for // an entry with a zero sectionHead (we store it with zero section head too if we don't know it // at the time of the retrieval) @@ -209,6 +209,7 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi if err == nil { result[i] = bloomBits } else { + // TODO(rjl493456442) Convert sectionIndex to BloomTrie relative index if sectionIdx >= bloomTrieCount { return nil, ErrNoTrustedBloomTrie } @@ -220,7 +221,8 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi return result, nil } - r := &BloomRequest{BloomTrieRoot: GetBloomTrieRoot(db, bloomTrieCount-1, sectionHead), BloomTrieNum: bloomTrieCount - 1, BitIdx: bitIdx, SectionIdxList: reqList} + r := &BloomRequest{BloomTrieRoot: GetBloomTrieRoot(db, bloomTrieCount-1, sectionHead), BloomTrieNum: bloomTrieCount - 1, + BitIdx: bitIdx, SectionIdxList: reqList, Config: odr.IndexerConfig()} if err := odr.Retrieve(ctx, r); err != nil { return nil, err } else { diff --git a/light/postprocess.go b/light/postprocess.go index f105d57b5..7b23e48b5 100644 --- a/light/postprocess.go +++ b/light/postprocess.go @@ -36,20 +36,75 @@ import ( "github.com/ethereum/go-ethereum/trie" ) -const ( - // CHTFrequencyClient is the block frequency for creating CHTs on the client side. - CHTFrequencyClient = 32768 +// IndexerConfig includes a set of configs for chain indexers. +type IndexerConfig struct { + // The block frequency for creating CHTs. + ChtSize uint64 - // CHTFrequencyServer is the block frequency for creating CHTs on the server side. - // Eventually this can be merged back with the client version, but that requires a - // full database upgrade, so that should be left for a suitable moment. - CHTFrequencyServer = 4096 + // A special auxiliary field represents client's chtsize for server config, otherwise represents server's chtsize. + PairChtSize uint64 - HelperTrieConfirmations = 2048 // number of confirmations before a server is expected to have the given HelperTrie available - HelperTrieProcessConfirmations = 256 // number of confirmations before a HelperTrie is generated + // The number of confirmations needed to generate/accept a canonical hash help trie. + ChtConfirms uint64 + + // The block frequency for creating new bloom bits. + BloomSize uint64 + + // The number of confirmation needed before a bloom section is considered probably final and its rotated bits + // are calculated. + BloomConfirms uint64 + + // The block frequency for creating BloomTrie. + BloomTrieSize uint64 + + // The number of confirmations needed to generate/accept a bloom trie. + BloomTrieConfirms uint64 +} + +var ( + // DefaultServerIndexerConfig wraps a set of configs as a default indexer config for server side. + DefaultServerIndexerConfig = &IndexerConfig{ + ChtSize: params.CHTFrequencyServer, + PairChtSize: params.CHTFrequencyClient, + ChtConfirms: params.HelperTrieProcessConfirmations, + BloomSize: params.BloomBitsBlocks, + BloomConfirms: params.BloomConfirms, + BloomTrieSize: params.BloomTrieFrequency, + BloomTrieConfirms: params.HelperTrieProcessConfirmations, + } + // DefaultClientIndexerConfig wraps a set of configs as a default indexer config for client side. + DefaultClientIndexerConfig = &IndexerConfig{ + ChtSize: params.CHTFrequencyClient, + PairChtSize: params.CHTFrequencyServer, + ChtConfirms: params.HelperTrieConfirmations, + BloomSize: params.BloomBitsBlocksClient, + BloomConfirms: params.HelperTrieConfirmations, + BloomTrieSize: params.BloomTrieFrequency, + BloomTrieConfirms: params.HelperTrieConfirmations, + } + // TestServerIndexerConfig wraps a set of configs as a test indexer config for server side. + TestServerIndexerConfig = &IndexerConfig{ + ChtSize: 256, + PairChtSize: 2048, + ChtConfirms: 16, + BloomSize: 256, + BloomConfirms: 16, + BloomTrieSize: 2048, + BloomTrieConfirms: 16, + } + // TestClientIndexerConfig wraps a set of configs as a test indexer config for client side. + TestClientIndexerConfig = &IndexerConfig{ + ChtSize: 2048, + PairChtSize: 256, + ChtConfirms: 128, + BloomSize: 2048, + BloomConfirms: 128, + BloomTrieSize: 2048, + BloomTrieConfirms: 128, + } ) -// TrustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with +// trustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with // the appropriate section index and head hash. It is used to start light syncing from this checkpoint // and avoid downloading the entire header chain while still being able to securely access old headers/logs. type TrustedCheckpoint struct { @@ -84,9 +139,9 @@ var trustedCheckpoints = map[common.Hash]TrustedCheckpoint{ } var ( - ErrNoTrustedCht = errors.New("No trusted canonical hash trie") - ErrNoTrustedBloomTrie = errors.New("No trusted bloom trie") - ErrNoHeader = errors.New("Header not found") + ErrNoTrustedCht = errors.New("no trusted canonical hash trie") + ErrNoTrustedBloomTrie = errors.New("no trusted bloom trie") + ErrNoHeader = errors.New("header not found") chtPrefix = []byte("chtRoot-") // chtPrefix + chtNum (uint64 big endian) -> trie root hash ChtTablePrefix = "cht-" ) @@ -97,8 +152,8 @@ type ChtNode struct { Td *big.Int } -// GetChtRoot reads the CHT root assoctiated to the given section from the database -// Note that sectionIdx is specified according to LES/1 CHT section size +// GetChtRoot reads the CHT root associated to the given section from the database +// Note that sectionIdx is specified according to LES/1 CHT section size. func GetChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) common.Hash { var encNumber [8]byte binary.BigEndian.PutUint64(encNumber[:], sectionIdx) @@ -106,21 +161,15 @@ func GetChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) c return common.BytesToHash(data) } -// GetChtV2Root reads the CHT root assoctiated to the given section from the database -// Note that sectionIdx is specified according to LES/2 CHT section size -func GetChtV2Root(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) common.Hash { - return GetChtRoot(db, (sectionIdx+1)*(CHTFrequencyClient/CHTFrequencyServer)-1, sectionHead) -} - -// StoreChtRoot writes the CHT root assoctiated to the given section into the database -// Note that sectionIdx is specified according to LES/1 CHT section size +// StoreChtRoot writes the CHT root associated to the given section into the database +// Note that sectionIdx is specified according to LES/1 CHT section size. func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common.Hash) { var encNumber [8]byte binary.BigEndian.PutUint64(encNumber[:], sectionIdx) db.Put(append(append(chtPrefix, encNumber[:]...), sectionHead.Bytes()...), root.Bytes()) } -// ChtIndexerBackend implements core.ChainIndexerBackend +// ChtIndexerBackend implements core.ChainIndexerBackend. type ChtIndexerBackend struct { diskdb, trieTable ethdb.Database odr OdrBackend @@ -130,33 +179,24 @@ type ChtIndexerBackend struct { trie *trie.Trie } -// NewBloomTrieIndexer creates a BloomTrie chain indexer -func NewChtIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer { - var sectionSize, confirmReq uint64 - if clientMode { - sectionSize = CHTFrequencyClient - confirmReq = HelperTrieConfirmations - } else { - sectionSize = CHTFrequencyServer - confirmReq = HelperTrieProcessConfirmations - } - idb := ethdb.NewTable(db, "chtIndex-") +// NewChtIndexer creates a Cht chain indexer +func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64) *core.ChainIndexer { trieTable := ethdb.NewTable(db, ChtTablePrefix) backend := &ChtIndexerBackend{ diskdb: db, odr: odr, trieTable: trieTable, triedb: trie.NewDatabase(trieTable), - sectionSize: sectionSize, + sectionSize: size, } - return core.NewChainIndexer(db, idb, backend, sectionSize, confirmReq, time.Millisecond*100, "cht") + return core.NewChainIndexer(db, ethdb.NewTable(db, "chtIndex-"), backend, size, confirms, time.Millisecond*100, "cht") } // fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the // ODR backend in order to be able to add new entries and calculate subsequent root hashes func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error { batch := c.trieTable.NewBatch() - r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1} + r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1, Config: c.odr.IndexerConfig()} for { err := c.odr.Retrieve(ctx, r) switch err { @@ -221,18 +261,13 @@ func (c *ChtIndexerBackend) Commit() error { } c.triedb.Commit(root, false) - if ((c.section+1)*c.sectionSize)%CHTFrequencyClient == 0 { - log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root)) + if ((c.section+1)*c.sectionSize)%params.CHTFrequencyClient == 0 { + log.Info("Storing CHT", "section", c.section*c.sectionSize/params.CHTFrequencyClient, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root)) } StoreChtRoot(c.diskdb, c.section, c.lastHash, root) return nil } -const ( - BloomTrieFrequency = 32768 - ethBloomBitsSection = 4096 -) - var ( bloomTriePrefix = []byte("bltRoot-") // bloomTriePrefix + bloomTrieNum (uint64 big endian) -> trie root hash BloomTrieTablePrefix = "blt-" @@ -255,33 +290,31 @@ func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root // BloomTrieIndexerBackend implements core.ChainIndexerBackend type BloomTrieIndexerBackend struct { - diskdb, trieTable ethdb.Database - odr OdrBackend - triedb *trie.Database - section, parentSectionSize, bloomTrieRatio uint64 - trie *trie.Trie - sectionHeads []common.Hash + diskdb, trieTable ethdb.Database + triedb *trie.Database + odr OdrBackend + section uint64 + parentSize uint64 + size uint64 + bloomTrieRatio uint64 + trie *trie.Trie + sectionHeads []common.Hash } // NewBloomTrieIndexer creates a BloomTrie chain indexer -func NewBloomTrieIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer { +func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uint64) *core.ChainIndexer { trieTable := ethdb.NewTable(db, BloomTrieTablePrefix) backend := &BloomTrieIndexerBackend{ - diskdb: db, - odr: odr, - trieTable: trieTable, - triedb: trie.NewDatabase(trieTable), + diskdb: db, + odr: odr, + trieTable: trieTable, + triedb: trie.NewDatabase(trieTable), + parentSize: parentSize, + size: size, } - idb := ethdb.NewTable(db, "bltIndex-") - - if clientMode { - backend.parentSectionSize = BloomTrieFrequency - } else { - backend.parentSectionSize = ethBloomBitsSection - } - backend.bloomTrieRatio = BloomTrieFrequency / backend.parentSectionSize + backend.bloomTrieRatio = size / parentSize backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio) - return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, 0, time.Millisecond*100, "bloomtrie") + return core.NewChainIndexer(db, ethdb.NewTable(db, "bltIndex-"), backend, size, 0, time.Millisecond*100, "bloomtrie") } // fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the @@ -296,7 +329,7 @@ func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section for i := 0; i < 20; i++ { go func() { for bitIndex := range indexCh { - r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIdxList: []uint64{section - 1}} + r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIdxList: []uint64{section - 1}, Config: b.odr.IndexerConfig()} for { if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers { // if there are no peers to serve, retry later @@ -351,9 +384,9 @@ func (b *BloomTrieIndexerBackend) Reset(ctx context.Context, section uint64, las // Process implements core.ChainIndexerBackend func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error { - num := header.Number.Uint64() - b.section*BloomTrieFrequency - if (num+1)%b.parentSectionSize == 0 { - b.sectionHeads[num/b.parentSectionSize] = header.Hash() + num := header.Number.Uint64() - b.section*b.size + if (num+1)%b.parentSize == 0 { + b.sectionHeads[num/b.parentSize] = header.Hash() } return nil } @@ -372,7 +405,7 @@ func (b *BloomTrieIndexerBackend) Commit() error { if err != nil { return err } - decompData, err2 := bitutil.DecompressBytes(data, int(b.parentSectionSize/8)) + decompData, err2 := bitutil.DecompressBytes(data, int(b.parentSize/8)) if err2 != nil { return err2 } @@ -397,6 +430,5 @@ func (b *BloomTrieIndexerBackend) Commit() error { sectionHead := b.sectionHeads[b.bloomTrieRatio-1] log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize)) StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root) - return nil } diff --git a/light/trie_test.go b/light/trie_test.go index 84c6f162f..6bddfefe2 100644 --- a/light/trie_test.go +++ b/light/trie_test.go @@ -47,7 +47,7 @@ func TestNodeIterator(t *testing.T) { } ctx := context.Background() - odr := &testOdr{sdb: fulldb, ldb: lightdb} + odr := &testOdr{sdb: fulldb, ldb: lightdb, indexerConfig: TestClientIndexerConfig} head := blockchain.CurrentHeader() lightTrie, _ := NewStateDatabase(ctx, head, odr).OpenTrie(head.Root) fullTrie, _ := state.NewDatabase(fulldb).OpenTrie(head.Root) diff --git a/light/txpool_test.go b/light/txpool_test.go index ccbd83a94..204347a6e 100644 --- a/light/txpool_test.go +++ b/light/txpool_test.go @@ -94,7 +94,7 @@ func TestTxPool(t *testing.T) { panic(err) } - odr := &testOdr{sdb: sdb, ldb: ldb} + odr := &testOdr{sdb: sdb, ldb: ldb, indexerConfig: TestClientIndexerConfig} relay := &testTxRelay{ send: make(chan int, 1), discard: make(chan int, 1), diff --git a/params/network_params.go b/params/network_params.go index 536a46d3d..f8731e897 100644 --- a/params/network_params.go +++ b/params/network_params.go @@ -17,10 +17,38 @@ package params // These are network parameters that need to be constant between clients, but -// aren't necesarilly consensus related. +// aren't necessarily consensus related. const ( // BloomBitsBlocks is the number of blocks a single bloom bit section vector - // contains. + // contains on the server side. BloomBitsBlocks uint64 = 4096 + + // BloomBitsBlocksClient is the number of blocks a single bloom bit section vector + // contains on the light client side + BloomBitsBlocksClient uint64 = 32768 + + // BloomConfirms is the number of confirmation blocks before a bloom section is + // considered probably final and its rotated bits are calculated. + BloomConfirms = 256 + + // CHTFrequencyClient is the block frequency for creating CHTs on the client side. + CHTFrequencyClient = 32768 + + // CHTFrequencyServer is the block frequency for creating CHTs on the server side. + // Eventually this can be merged back with the client version, but that requires a + // full database upgrade, so that should be left for a suitable moment. + CHTFrequencyServer = 4096 + + // BloomTrieFrequency is the block frequency for creating BloomTrie on both + // server/client sides. + BloomTrieFrequency = 32768 + + // HelperTrieConfirmations is the number of confirmations before a client is expected + // to have the given HelperTrie available. + HelperTrieConfirmations = 2048 + + // HelperTrieProcessConfirmations is the number of confirmations before a HelperTrie + // is generated + HelperTrieProcessConfirmations = 256 )