Merge pull request #778 from bas-vk/issue764

Moved database update loop to eth/backend
This commit is contained in:
Jeffrey Wilcke 2015-04-23 11:46:27 -07:00
commit f5e0388f62
5 changed files with 62 additions and 48 deletions

View File

@ -7,4 +7,5 @@ type Database interface {
Delete(key []byte) error Delete(key []byte) error
LastKnownTD() []byte LastKnownTD() []byte
Close() Close()
Flush() error
} }

View File

@ -342,14 +342,14 @@ func (self *ChainManager) Export(w io.Writer) error {
} }
func (bc *ChainManager) insert(block *types.Block) { func (bc *ChainManager) insert(block *types.Block) {
key := append(blockNumPre, block.Number().Bytes()...)
bc.blockDb.Put(key, block.Hash().Bytes())
// Push block to cache
bc.cache.Push(block)
bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes()) bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
bc.currentBlock = block bc.currentBlock = block
bc.lastBlockHash = block.Hash() bc.lastBlockHash = block.Hash()
key := append(blockNumPre, block.Number().Bytes()...)
bc.blockDb.Put(key, bc.lastBlockHash.Bytes())
// Push block to cache
bc.cache.Push(block)
} }
func (bc *ChainManager) write(block *types.Block) { func (bc *ChainManager) write(block *types.Block) {

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"path" "path"
"strings" "strings"
"time"
"github.com/ethereum/ethash" "github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
@ -123,6 +124,8 @@ type Ethereum struct {
blockDb common.Database // Block chain database blockDb common.Database // Block chain database
stateDb common.Database // State changes database stateDb common.Database // State changes database
extraDb common.Database // Extra database (txs, etc) extraDb common.Database // Extra database (txs, etc)
// Closed when databases are flushed and closed
databasesClosed chan bool
//*** SERVICES *** //*** SERVICES ***
// State manager for processing new blocks and managing the over all states // State manager for processing new blocks and managing the over all states
@ -198,6 +201,7 @@ func New(config *Config) (*Ethereum, error) {
eth := &Ethereum{ eth := &Ethereum{
shutdownChan: make(chan bool), shutdownChan: make(chan bool),
databasesClosed: make(chan bool),
blockDb: blockDb, blockDb: blockDb,
stateDb: stateDb, stateDb: stateDb,
extraDb: extraDb, extraDb: extraDb,
@ -376,6 +380,9 @@ func (s *Ethereum) Start() error {
} }
} }
// periodically flush databases
go s.syncDatabases()
// Start services // Start services
go s.txPool.Start() go s.txPool.Start()
s.protocolManager.Start() s.protocolManager.Start()
@ -392,6 +399,34 @@ func (s *Ethereum) Start() error {
return nil return nil
} }
func (s *Ethereum) syncDatabases() {
ticker := time.NewTicker(1 * time.Minute)
done:
for {
select {
case <-ticker.C:
// don't change the order of database flushes
if err := s.extraDb.Flush(); err != nil {
glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err)
}
if err := s.stateDb.Flush(); err != nil {
glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err)
}
if err := s.blockDb.Flush(); err != nil {
glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err)
}
case <-s.shutdownChan:
break done
}
}
s.blockDb.Close()
s.stateDb.Close()
s.extraDb.Close()
close(s.databasesClosed)
}
func (s *Ethereum) StartForTest() { func (s *Ethereum) StartForTest() {
jsonlogger.LogJson(&logger.LogStarting{ jsonlogger.LogJson(&logger.LogStarting{
ClientString: s.net.Name, ClientString: s.net.Name,
@ -412,11 +447,6 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error {
} }
func (s *Ethereum) Stop() { func (s *Ethereum) Stop() {
// Close the database
defer s.blockDb.Close()
defer s.stateDb.Close()
defer s.extraDb.Close()
s.txSub.Unsubscribe() // quits txBroadcastLoop s.txSub.Unsubscribe() // quits txBroadcastLoop
s.protocolManager.Stop() s.protocolManager.Stop()
@ -432,6 +462,7 @@ func (s *Ethereum) Stop() {
// This function will wait for a shutdown and resumes main thread execution // This function will wait for a shutdown and resumes main thread execution
func (s *Ethereum) WaitForShutdown() { func (s *Ethereum) WaitForShutdown() {
<-s.databasesClosed
<-s.shutdownChan <-s.shutdownChan
} }

View File

@ -2,7 +2,6 @@ package ethdb
import ( import (
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/compression/rle" "github.com/ethereum/go-ethereum/compression/rle"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
@ -35,8 +34,6 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) {
} }
database.makeQueue() database.makeQueue()
go database.update()
return database, nil return database, nil
} }
@ -111,35 +108,16 @@ func (self *LDBDatabase) Flush() error {
} }
self.makeQueue() // reset the queue self.makeQueue() // reset the queue
glog.V(logger.Detail).Infoln("Flush database: ", self.fn)
return self.db.Write(batch, nil) return self.db.Write(batch, nil)
} }
func (self *LDBDatabase) Close() { func (self *LDBDatabase) Close() {
self.quit <- struct{}{}
<-self.quit
glog.V(logger.Info).Infoln("flushed and closed db:", self.fn)
}
func (self *LDBDatabase) update() {
ticker := time.NewTicker(1 * time.Minute)
done:
for {
select {
case <-ticker.C:
if err := self.Flush(); err != nil {
glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
}
case <-self.quit:
break done
}
}
if err := self.Flush(); err != nil { if err := self.Flush(); err != nil {
glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
} }
// Close the leveldb database
self.db.Close() self.db.Close()
glog.V(logger.Error).Infoln("flushed and closed db:", self.fn)
self.quit <- struct{}{}
} }

View File

@ -65,3 +65,7 @@ func (db *MemDatabase) LastKnownTD() []byte {
return data return data
} }
func (db *MemDatabase) Flush() error {
return nil
}