From 7f32a08b6095bb5f1ff58168be70326ee0c29266 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 7 Apr 2015 22:19:01 +0200 Subject: [PATCH] Queued level db writes and batch writes. Closes #647 --- common/db.go | 2 - ethdb/database.go | 117 ++++++++++++++++++++++++++++++----------- ethdb/database_test.go | 27 ++++------ 3 files changed, 97 insertions(+), 49 deletions(-) diff --git a/common/db.go b/common/db.go index 6505e61c6..408b1e755 100644 --- a/common/db.go +++ b/common/db.go @@ -4,9 +4,7 @@ package common type Database interface { Put(key []byte, value []byte) Get(key []byte) ([]byte, error) - //GetKeys() []*Key Delete(key []byte) error LastKnownTD() []byte Close() - Print() } diff --git a/ethdb/database.go b/ethdb/database.go index cc2df5fa0..eb562f852 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -1,17 +1,25 @@ package ethdb import ( - "fmt" + "sync" + "time" "github.com/ethereum/go-ethereum/compression/rle" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" ) type LDBDatabase struct { - db *leveldb.DB - comp bool + fn string + + mu sync.Mutex + db *leveldb.DB + + queue map[string][]byte + + quit chan struct{} } func NewLDBDatabase(file string) (*LDBDatabase, error) { @@ -20,35 +28,61 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) { if err != nil { return nil, err } - database := &LDBDatabase{db: db, comp: true} + database := &LDBDatabase{ + fn: file, + db: db, + quit: make(chan struct{}), + } + database.makeQueue() + + go database.update() + return database, nil } -func (self *LDBDatabase) Put(key []byte, value []byte) { - if self.comp { - value = rle.Compress(value) - } +func (self *LDBDatabase) makeQueue() { + self.queue = make(map[string][]byte) +} - err := self.db.Put(key, value, nil) - if err != nil { - fmt.Println("Error put", err) - } +func (self *LDBDatabase) Put(key []byte, value []byte) { + self.mu.Lock() + defer self.mu.Unlock() + + self.queue[string(key)] = value + /* + value = rle.Compress(value) + + err := self.db.Put(key, value, nil) + if err != nil { + fmt.Println("Error put", err) + } + */ } func (self *LDBDatabase) Get(key []byte) ([]byte, error) { + self.mu.Lock() + defer self.mu.Unlock() + + // Check queue first + if dat, ok := self.queue[string(key)]; ok { + return dat, nil + } + dat, err := self.db.Get(key, nil) if err != nil { return nil, err } - if self.comp { - return rle.Decompress(dat) - } - - return dat, nil + return rle.Decompress(dat) } func (self *LDBDatabase) Delete(key []byte) error { + self.mu.Lock() + defer self.mu.Unlock() + + // make sure it's not in the queue + delete(self.queue, string(key)) + return self.db.Delete(key, nil) } @@ -66,23 +100,46 @@ func (self *LDBDatabase) NewIterator() iterator.Iterator { return self.db.NewIterator(nil, nil) } -func (self *LDBDatabase) Write(batch *leveldb.Batch) error { +func (self *LDBDatabase) Flush() error { + self.mu.Lock() + defer self.mu.Unlock() + + batch := new(leveldb.Batch) + + for key, value := range self.queue { + batch.Put([]byte(key), rle.Compress(value)) + } + self.makeQueue() // reset the queue + return self.db.Write(batch, nil) } func (self *LDBDatabase) Close() { + self.quit <- struct{}{} + <-self.quit + glog.V(logger.Info).Infoln("flushed and closed db:", self.fn) +} + +func (self *LDBDatabase) update() { + ticker := time.NewTicker(1 * time.Minute) +done: + for { + select { + case <-ticker.C: + if err := self.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) + } + case <-self.quit: + break done + } + } + + if err := self.Flush(); err != nil { + glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) + } + // Close the leveldb database self.db.Close() -} -func (self *LDBDatabase) Print() { - iter := self.db.NewIterator(nil, nil) - for iter.Next() { - key := iter.Key() - value := iter.Value() - - fmt.Printf("%x(%d): ", key, len(key)) - node := common.NewValueFromBytes(value) - fmt.Printf("%v\n", node) - } + self.quit <- struct{}{} } diff --git a/ethdb/database_test.go b/ethdb/database_test.go index 7de30fd81..3cead8bcd 100644 --- a/ethdb/database_test.go +++ b/ethdb/database_test.go @@ -1,26 +1,19 @@ package ethdb -/* import ( - "bytes" - "testing" + "os" + "path" + + "github.com/ethereum/go-ethereum/common" ) -func TestCompression(t *testing.T) { - db, err := NewLDBDatabase("testdb") - if err != nil { - t.Fatal(err) +func newDb() *LDBDatabase { + file := path.Join("/", "tmp", "ldbtesttmpfile") + if common.FileExist(file) { + os.RemoveAll(file) } - in := make([]byte, 10) - db.Put([]byte("test1"), in) - out, err := db.Get([]byte("test1")) - if err != nil { - t.Fatal(err) - } + db, _ := NewLDBDatabase(file) - if bytes.Compare(out, in) != 0 { - t.Error("put get", in, out) - } + return db } -*/