Queued level db writes and batch writes. Closes #647
This commit is contained in:
		
							parent
							
								
									758205b187
								
							
						
					
					
						commit
						7f32a08b60
					
				| @ -4,9 +4,7 @@ package common | ||||
| type Database interface { | ||||
| 	Put(key []byte, value []byte) | ||||
| 	Get(key []byte) ([]byte, error) | ||||
| 	//GetKeys() []*Key
 | ||||
| 	Delete(key []byte) error | ||||
| 	LastKnownTD() []byte | ||||
| 	Close() | ||||
| 	Print() | ||||
| } | ||||
|  | ||||
| @ -1,17 +1,25 @@ | ||||
| package ethdb | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/ethereum/go-ethereum/compression/rle" | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| 	"github.com/ethereum/go-ethereum/logger" | ||||
| 	"github.com/ethereum/go-ethereum/logger/glog" | ||||
| 	"github.com/syndtr/goleveldb/leveldb" | ||||
| 	"github.com/syndtr/goleveldb/leveldb/iterator" | ||||
| ) | ||||
| 
 | ||||
| type LDBDatabase struct { | ||||
| 	db   *leveldb.DB | ||||
| 	comp bool | ||||
| 	fn string | ||||
| 
 | ||||
| 	mu sync.Mutex | ||||
| 	db *leveldb.DB | ||||
| 
 | ||||
| 	queue map[string][]byte | ||||
| 
 | ||||
| 	quit chan struct{} | ||||
| } | ||||
| 
 | ||||
| func NewLDBDatabase(file string) (*LDBDatabase, error) { | ||||
| @ -20,35 +28,61 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) { | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	database := &LDBDatabase{db: db, comp: true} | ||||
| 	database := &LDBDatabase{ | ||||
| 		fn:   file, | ||||
| 		db:   db, | ||||
| 		quit: make(chan struct{}), | ||||
| 	} | ||||
| 	database.makeQueue() | ||||
| 
 | ||||
| 	go database.update() | ||||
| 
 | ||||
| 	return database, nil | ||||
| } | ||||
| 
 | ||||
| func (self *LDBDatabase) Put(key []byte, value []byte) { | ||||
| 	if self.comp { | ||||
| 		value = rle.Compress(value) | ||||
| 	} | ||||
| func (self *LDBDatabase) makeQueue() { | ||||
| 	self.queue = make(map[string][]byte) | ||||
| } | ||||
| 
 | ||||
| 	err := self.db.Put(key, value, nil) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("Error put", err) | ||||
| 	} | ||||
| func (self *LDBDatabase) Put(key []byte, value []byte) { | ||||
| 	self.mu.Lock() | ||||
| 	defer self.mu.Unlock() | ||||
| 
 | ||||
| 	self.queue[string(key)] = value | ||||
| 	/* | ||||
| 		value = rle.Compress(value) | ||||
| 
 | ||||
| 		err := self.db.Put(key, value, nil) | ||||
| 		if err != nil { | ||||
| 			fmt.Println("Error put", err) | ||||
| 		} | ||||
| 	*/ | ||||
| } | ||||
| 
 | ||||
| func (self *LDBDatabase) Get(key []byte) ([]byte, error) { | ||||
| 	self.mu.Lock() | ||||
| 	defer self.mu.Unlock() | ||||
| 
 | ||||
| 	// Check queue first
 | ||||
| 	if dat, ok := self.queue[string(key)]; ok { | ||||
| 		return dat, nil | ||||
| 	} | ||||
| 
 | ||||
| 	dat, err := self.db.Get(key, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if self.comp { | ||||
| 		return rle.Decompress(dat) | ||||
| 	} | ||||
| 
 | ||||
| 	return dat, nil | ||||
| 	return rle.Decompress(dat) | ||||
| } | ||||
| 
 | ||||
| func (self *LDBDatabase) Delete(key []byte) error { | ||||
| 	self.mu.Lock() | ||||
| 	defer self.mu.Unlock() | ||||
| 
 | ||||
| 	// make sure it's not in the queue
 | ||||
| 	delete(self.queue, string(key)) | ||||
| 
 | ||||
| 	return self.db.Delete(key, nil) | ||||
| } | ||||
| 
 | ||||
| @ -66,23 +100,46 @@ func (self *LDBDatabase) NewIterator() iterator.Iterator { | ||||
| 	return self.db.NewIterator(nil, nil) | ||||
| } | ||||
| 
 | ||||
| func (self *LDBDatabase) Write(batch *leveldb.Batch) error { | ||||
| func (self *LDBDatabase) Flush() error { | ||||
| 	self.mu.Lock() | ||||
| 	defer self.mu.Unlock() | ||||
| 
 | ||||
| 	batch := new(leveldb.Batch) | ||||
| 
 | ||||
| 	for key, value := range self.queue { | ||||
| 		batch.Put([]byte(key), rle.Compress(value)) | ||||
| 	} | ||||
| 	self.makeQueue() // reset the queue
 | ||||
| 
 | ||||
| 	return self.db.Write(batch, nil) | ||||
| } | ||||
| 
 | ||||
| func (self *LDBDatabase) Close() { | ||||
| 	self.quit <- struct{}{} | ||||
| 	<-self.quit | ||||
| 	glog.V(logger.Info).Infoln("flushed and closed db:", self.fn) | ||||
| } | ||||
| 
 | ||||
| func (self *LDBDatabase) update() { | ||||
| 	ticker := time.NewTicker(1 * time.Minute) | ||||
| done: | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ticker.C: | ||||
| 			if err := self.Flush(); err != nil { | ||||
| 				glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) | ||||
| 			} | ||||
| 		case <-self.quit: | ||||
| 			break done | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if err := self.Flush(); err != nil { | ||||
| 		glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err) | ||||
| 	} | ||||
| 
 | ||||
| 	// Close the leveldb database
 | ||||
| 	self.db.Close() | ||||
| } | ||||
| 
 | ||||
| func (self *LDBDatabase) Print() { | ||||
| 	iter := self.db.NewIterator(nil, nil) | ||||
| 	for iter.Next() { | ||||
| 		key := iter.Key() | ||||
| 		value := iter.Value() | ||||
| 
 | ||||
| 		fmt.Printf("%x(%d): ", key, len(key)) | ||||
| 		node := common.NewValueFromBytes(value) | ||||
| 		fmt.Printf("%v\n", node) | ||||
| 	} | ||||
| 	self.quit <- struct{}{} | ||||
| } | ||||
|  | ||||
| @ -1,26 +1,19 @@ | ||||
| package ethdb | ||||
| 
 | ||||
| /* | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"testing" | ||||
| 	"os" | ||||
| 	"path" | ||||
| 
 | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| ) | ||||
| 
 | ||||
| func TestCompression(t *testing.T) { | ||||
| 	db, err := NewLDBDatabase("testdb") | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| func newDb() *LDBDatabase { | ||||
| 	file := path.Join("/", "tmp", "ldbtesttmpfile") | ||||
| 	if common.FileExist(file) { | ||||
| 		os.RemoveAll(file) | ||||
| 	} | ||||
| 
 | ||||
| 	in := make([]byte, 10) | ||||
| 	db.Put([]byte("test1"), in) | ||||
| 	out, err := db.Get([]byte("test1")) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	db, _ := NewLDBDatabase(file) | ||||
| 
 | ||||
| 	if bytes.Compare(out, in) != 0 { | ||||
| 		t.Error("put get", in, out) | ||||
| 	} | ||||
| 	return db | ||||
| } | ||||
| */ | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user