From 07e85d8e146c0aa08fcfe0871d1e9b69487e67ea Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Wed, 22 Apr 2015 12:46:41 +0200 Subject: [PATCH 1/4] Moved leveldb update loop to eth/backend --- common/db.go | 1 + eth/backend.go | 65 +++++++++++++++++++++++++++++----------- ethdb/database.go | 28 ++--------------- ethdb/memory_database.go | 4 +++ 4 files changed, 56 insertions(+), 42 deletions(-) diff --git a/common/db.go b/common/db.go index 408b1e755..ae13c7557 100644 --- a/common/db.go +++ b/common/db.go @@ -7,4 +7,5 @@ type Database interface { Delete(key []byte) error LastKnownTD() []byte Close() + Flush() error } diff --git a/eth/backend.go b/eth/backend.go index 88456e448..18c3397c8 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -6,6 +6,7 @@ import ( "math" "path" "strings" + "time" "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/accounts" @@ -124,6 +125,8 @@ type Ethereum struct { blockDb common.Database // Block chain database stateDb common.Database // State changes database extraDb common.Database // Extra database (txs, etc) + // Closed when databases are flushed and closed + databasesClosed chan bool //*** SERVICES *** // State manager for processing new blocks and managing the over all states @@ -199,18 +202,19 @@ func New(config *Config) (*Ethereum, error) { glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion) eth := &Ethereum{ - shutdownChan: make(chan bool), - blockDb: blockDb, - stateDb: stateDb, - extraDb: extraDb, - eventMux: &event.TypeMux{}, - accountManager: config.AccountManager, - DataDir: config.DataDir, - etherbase: common.HexToAddress(config.Etherbase), - clientVersion: config.Name, // TODO should separate from Name - ethVersionId: config.ProtocolVersion, - netVersionId: config.NetworkId, - NatSpec: config.NatSpec, + shutdownChan: make(chan bool), + databasesClosed: make(chan bool), + blockDb: blockDb, + stateDb: stateDb, + extraDb: extraDb, + eventMux: &event.TypeMux{}, + accountManager: config.AccountManager, + DataDir: config.DataDir, + etherbase: common.HexToAddress(config.Etherbase), + clientVersion: config.Name, // TODO should separate from Name + ethVersionId: config.ProtocolVersion, + netVersionId: config.NetworkId, + NatSpec: config.NatSpec, } eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) @@ -378,6 +382,9 @@ func (s *Ethereum) Start() error { } } + // periodically flush databases + go s.syncDatabases() + // Start services s.txPool.Start() @@ -397,6 +404,34 @@ func (s *Ethereum) Start() error { return nil } +func (s *Ethereum) syncDatabases() { + ticker := time.NewTicker(1 * time.Minute) +done: + for { + select { + case <-ticker.C: + // don't change the order of database flushes + if err := s.extraDb.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err) + } + if err := s.stateDb.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err) + } + if err := s.blockDb.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err) + } + case <-s.shutdownChan: + break done + } + } + + s.blockDb.Close() + s.stateDb.Close() + s.extraDb.Close() + + close(s.databasesClosed) +} + func (s *Ethereum) StartForTest() { jsonlogger.LogJson(&logger.LogStarting{ ClientString: s.net.Name, @@ -417,11 +452,6 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error { } func (s *Ethereum) Stop() { - // Close the database - defer s.blockDb.Close() - defer s.stateDb.Close() - defer s.extraDb.Close() - s.txSub.Unsubscribe() // quits txBroadcastLoop s.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop @@ -437,6 +467,7 @@ func (s *Ethereum) Stop() { // This function will wait for a shutdown and resumes main thread execution func (s *Ethereum) WaitForShutdown() { + <-s.databasesClosed <-s.shutdownChan } diff --git a/ethdb/database.go b/ethdb/database.go index eb562f852..57a3f9ee6 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -2,7 +2,6 @@ package ethdb import ( "sync" - "time" "github.com/ethereum/go-ethereum/compression/rle" "github.com/ethereum/go-ethereum/logger" @@ -35,8 +34,6 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) { } database.makeQueue() - go database.update() - return database, nil } @@ -111,35 +108,16 @@ func (self *LDBDatabase) Flush() error { } self.makeQueue() // reset the queue + glog.V(logger.Detail).Infoln("Flush database: ", self.fn) + return self.db.Write(batch, nil) } func (self *LDBDatabase) Close() { - self.quit <- struct{}{} - <-self.quit - glog.V(logger.Info).Infoln("flushed and closed db:", self.fn) -} - -func (self *LDBDatabase) update() { - ticker := time.NewTicker(1 * time.Minute) -done: - for { - select { - case <-ticker.C: - if err := self.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) - } - case <-self.quit: - break done - } - } - if err := self.Flush(); err != nil { glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) } - // Close the leveldb database self.db.Close() - - self.quit <- struct{}{} + glog.V(logger.Error).Infoln("flushed and closed db:", self.fn) } diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go index d4988d0d8..f5d5faee7 100644 --- a/ethdb/memory_database.go +++ b/ethdb/memory_database.go @@ -65,3 +65,7 @@ func (db *MemDatabase) LastKnownTD() []byte { return data } + +func (db *MemDatabase) Flush() error { + return nil +} From c9e22976f59f788a9776beab346e7dcf25af48ac Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Wed, 22 Apr 2015 12:50:33 +0200 Subject: [PATCH 2/4] change order of block insert and update LastBlock --- core/chain_manager.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/chain_manager.go b/core/chain_manager.go index 1df56b27f..e5fcd4afc 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -342,14 +342,14 @@ func (self *ChainManager) Export(w io.Writer) error { } func (bc *ChainManager) insert(block *types.Block) { - bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes()) - bc.currentBlock = block - bc.lastBlockHash = block.Hash() - key := append(blockNumPre, block.Number().Bytes()...) bc.blockDb.Put(key, bc.lastBlockHash.Bytes()) // Push block to cache bc.cache.Push(block) + + bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes()) + bc.currentBlock = block + bc.lastBlockHash = block.Hash() } func (bc *ChainManager) write(block *types.Block) { From 5cfa0e918799f296580d01093b2e3ec921b93ba4 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 23 Apr 2015 17:35:05 +0200 Subject: [PATCH 3/4] bugfix, wrong hash stored in blockDb --- core/chain_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/chain_manager.go b/core/chain_manager.go index e5fcd4afc..0003a6011 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -343,7 +343,7 @@ func (self *ChainManager) Export(w io.Writer) error { func (bc *ChainManager) insert(block *types.Block) { key := append(blockNumPre, block.Number().Bytes()...) - bc.blockDb.Put(key, bc.lastBlockHash.Bytes()) + bc.blockDb.Put(key, block.Hash().Bytes()) // Push block to cache bc.cache.Push(block) From c273ed7d82f3d5beab7c213fbe1f5c0942adf0bd Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Wed, 22 Apr 2015 12:46:41 +0200 Subject: [PATCH 4/4] Moved leveldb update loop to eth/backend change order of block insert and update LastBlock bugfix, wrong hash stored in blockDb --- common/db.go | 1 + core/chain_manager.go | 10 +++--- eth/backend.go | 67 +++++++++++++++++++++++++++++----------- ethdb/database.go | 28 ++--------------- ethdb/memory_database.go | 4 +++ 5 files changed, 62 insertions(+), 48 deletions(-) diff --git a/common/db.go b/common/db.go index 408b1e755..ae13c7557 100644 --- a/common/db.go +++ b/common/db.go @@ -7,4 +7,5 @@ type Database interface { Delete(key []byte) error LastKnownTD() []byte Close() + Flush() error } diff --git a/core/chain_manager.go b/core/chain_manager.go index 47f84b80a..a09b2e63b 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -342,14 +342,14 @@ func (self *ChainManager) Export(w io.Writer) error { } func (bc *ChainManager) insert(block *types.Block) { + key := append(blockNumPre, block.Number().Bytes()...) + bc.blockDb.Put(key, block.Hash().Bytes()) + // Push block to cache + bc.cache.Push(block) + bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes()) bc.currentBlock = block bc.lastBlockHash = block.Hash() - - key := append(blockNumPre, block.Number().Bytes()...) - bc.blockDb.Put(key, bc.lastBlockHash.Bytes()) - // Push block to cache - bc.cache.Push(block) } func (bc *ChainManager) write(block *types.Block) { diff --git a/eth/backend.go b/eth/backend.go index 646a4eaf2..0e671f1bf 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -5,6 +5,7 @@ import ( "fmt" "path" "strings" + "time" "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/accounts" @@ -123,6 +124,8 @@ type Ethereum struct { blockDb common.Database // Block chain database stateDb common.Database // State changes database extraDb common.Database // Extra database (txs, etc) + // Closed when databases are flushed and closed + databasesClosed chan bool //*** SERVICES *** // State manager for processing new blocks and managing the over all states @@ -197,18 +200,19 @@ func New(config *Config) (*Ethereum, error) { glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion) eth := &Ethereum{ - shutdownChan: make(chan bool), - blockDb: blockDb, - stateDb: stateDb, - extraDb: extraDb, - eventMux: &event.TypeMux{}, - accountManager: config.AccountManager, - DataDir: config.DataDir, - etherbase: common.HexToAddress(config.Etherbase), - clientVersion: config.Name, // TODO should separate from Name - ethVersionId: config.ProtocolVersion, - netVersionId: config.NetworkId, - NatSpec: config.NatSpec, + shutdownChan: make(chan bool), + databasesClosed: make(chan bool), + blockDb: blockDb, + stateDb: stateDb, + extraDb: extraDb, + eventMux: &event.TypeMux{}, + accountManager: config.AccountManager, + DataDir: config.DataDir, + etherbase: common.HexToAddress(config.Etherbase), + clientVersion: config.Name, // TODO should separate from Name + ethVersionId: config.ProtocolVersion, + netVersionId: config.NetworkId, + NatSpec: config.NatSpec, } eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) @@ -376,6 +380,9 @@ func (s *Ethereum) Start() error { } } + // periodically flush databases + go s.syncDatabases() + // Start services go s.txPool.Start() s.protocolManager.Start() @@ -392,6 +399,34 @@ func (s *Ethereum) Start() error { return nil } +func (s *Ethereum) syncDatabases() { + ticker := time.NewTicker(1 * time.Minute) +done: + for { + select { + case <-ticker.C: + // don't change the order of database flushes + if err := s.extraDb.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err) + } + if err := s.stateDb.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err) + } + if err := s.blockDb.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err) + } + case <-s.shutdownChan: + break done + } + } + + s.blockDb.Close() + s.stateDb.Close() + s.extraDb.Close() + + close(s.databasesClosed) +} + func (s *Ethereum) StartForTest() { jsonlogger.LogJson(&logger.LogStarting{ ClientString: s.net.Name, @@ -412,12 +447,7 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error { } func (s *Ethereum) Stop() { - // Close the database - defer s.blockDb.Close() - defer s.stateDb.Close() - defer s.extraDb.Close() - - s.txSub.Unsubscribe() // quits txBroadcastLoop + s.txSub.Unsubscribe() // quits txBroadcastLoop s.protocolManager.Stop() s.txPool.Stop() @@ -432,6 +462,7 @@ func (s *Ethereum) Stop() { // This function will wait for a shutdown and resumes main thread execution func (s *Ethereum) WaitForShutdown() { + <-s.databasesClosed <-s.shutdownChan } diff --git a/ethdb/database.go b/ethdb/database.go index eb562f852..57a3f9ee6 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -2,7 +2,6 @@ package ethdb import ( "sync" - "time" "github.com/ethereum/go-ethereum/compression/rle" "github.com/ethereum/go-ethereum/logger" @@ -35,8 +34,6 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) { } database.makeQueue() - go database.update() - return database, nil } @@ -111,35 +108,16 @@ func (self *LDBDatabase) Flush() error { } self.makeQueue() // reset the queue + glog.V(logger.Detail).Infoln("Flush database: ", self.fn) + return self.db.Write(batch, nil) } func (self *LDBDatabase) Close() { - self.quit <- struct{}{} - <-self.quit - glog.V(logger.Info).Infoln("flushed and closed db:", self.fn) -} - -func (self *LDBDatabase) update() { - ticker := time.NewTicker(1 * time.Minute) -done: - for { - select { - case <-ticker.C: - if err := self.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) - } - case <-self.quit: - break done - } - } - if err := self.Flush(); err != nil { glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) } - // Close the leveldb database self.db.Close() - - self.quit <- struct{}{} + glog.V(logger.Error).Infoln("flushed and closed db:", self.fn) } diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go index d4988d0d8..f5d5faee7 100644 --- a/ethdb/memory_database.go +++ b/ethdb/memory_database.go @@ -65,3 +65,7 @@ func (db *MemDatabase) LastKnownTD() []byte { return data } + +func (db *MemDatabase) Flush() error { + return nil +}