Moved leveldb update loop to eth/backend
This commit is contained in:
parent
4ddbf81e74
commit
07e85d8e14
@ -7,4 +7,5 @@ type Database interface {
|
||||
Delete(key []byte) error
|
||||
LastKnownTD() []byte
|
||||
Close()
|
||||
Flush() error
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"math"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/ethash"
|
||||
"github.com/ethereum/go-ethereum/accounts"
|
||||
@ -124,6 +125,8 @@ type Ethereum struct {
|
||||
blockDb common.Database // Block chain database
|
||||
stateDb common.Database // State changes database
|
||||
extraDb common.Database // Extra database (txs, etc)
|
||||
// Closed when databases are flushed and closed
|
||||
databasesClosed chan bool
|
||||
|
||||
//*** SERVICES ***
|
||||
// State manager for processing new blocks and managing the over all states
|
||||
@ -200,6 +203,7 @@ func New(config *Config) (*Ethereum, error) {
|
||||
|
||||
eth := &Ethereum{
|
||||
shutdownChan: make(chan bool),
|
||||
databasesClosed: make(chan bool),
|
||||
blockDb: blockDb,
|
||||
stateDb: stateDb,
|
||||
extraDb: extraDb,
|
||||
@ -378,6 +382,9 @@ func (s *Ethereum) Start() error {
|
||||
}
|
||||
}
|
||||
|
||||
// periodically flush databases
|
||||
go s.syncDatabases()
|
||||
|
||||
// Start services
|
||||
s.txPool.Start()
|
||||
|
||||
@ -397,6 +404,34 @@ func (s *Ethereum) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Ethereum) syncDatabases() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
done:
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// don't change the order of database flushes
|
||||
if err := s.extraDb.Flush(); err != nil {
|
||||
glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err)
|
||||
}
|
||||
if err := s.stateDb.Flush(); err != nil {
|
||||
glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err)
|
||||
}
|
||||
if err := s.blockDb.Flush(); err != nil {
|
||||
glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err)
|
||||
}
|
||||
case <-s.shutdownChan:
|
||||
break done
|
||||
}
|
||||
}
|
||||
|
||||
s.blockDb.Close()
|
||||
s.stateDb.Close()
|
||||
s.extraDb.Close()
|
||||
|
||||
close(s.databasesClosed)
|
||||
}
|
||||
|
||||
func (s *Ethereum) StartForTest() {
|
||||
jsonlogger.LogJson(&logger.LogStarting{
|
||||
ClientString: s.net.Name,
|
||||
@ -417,11 +452,6 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error {
|
||||
}
|
||||
|
||||
func (s *Ethereum) Stop() {
|
||||
// Close the database
|
||||
defer s.blockDb.Close()
|
||||
defer s.stateDb.Close()
|
||||
defer s.extraDb.Close()
|
||||
|
||||
s.txSub.Unsubscribe() // quits txBroadcastLoop
|
||||
s.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
|
||||
|
||||
@ -437,6 +467,7 @@ func (s *Ethereum) Stop() {
|
||||
|
||||
// This function will wait for a shutdown and resumes main thread execution
|
||||
func (s *Ethereum) WaitForShutdown() {
|
||||
<-s.databasesClosed
|
||||
<-s.shutdownChan
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,6 @@ package ethdb
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/compression/rle"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
@ -35,8 +34,6 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) {
|
||||
}
|
||||
database.makeQueue()
|
||||
|
||||
go database.update()
|
||||
|
||||
return database, nil
|
||||
}
|
||||
|
||||
@ -111,35 +108,16 @@ func (self *LDBDatabase) Flush() error {
|
||||
}
|
||||
self.makeQueue() // reset the queue
|
||||
|
||||
glog.V(logger.Detail).Infoln("Flush database: ", self.fn)
|
||||
|
||||
return self.db.Write(batch, nil)
|
||||
}
|
||||
|
||||
func (self *LDBDatabase) Close() {
|
||||
self.quit <- struct{}{}
|
||||
<-self.quit
|
||||
glog.V(logger.Info).Infoln("flushed and closed db:", self.fn)
|
||||
}
|
||||
|
||||
func (self *LDBDatabase) update() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
done:
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := self.Flush(); err != nil {
|
||||
glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
|
||||
}
|
||||
case <-self.quit:
|
||||
break done
|
||||
}
|
||||
}
|
||||
|
||||
if err := self.Flush(); err != nil {
|
||||
glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
|
||||
}
|
||||
|
||||
// Close the leveldb database
|
||||
self.db.Close()
|
||||
|
||||
self.quit <- struct{}{}
|
||||
glog.V(logger.Error).Infoln("flushed and closed db:", self.fn)
|
||||
}
|
||||
|
@ -65,3 +65,7 @@ func (db *MemDatabase) LastKnownTD() []byte {
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
func (db *MemDatabase) Flush() error {
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user