Add additional extra database for non-protocol related data

* Add transaction to extra database after a successful block process
This commit is contained in:
obscuren 2015-03-12 14:50:35 +01:00
parent 31a95151c9
commit ef6706696c
5 changed files with 31 additions and 13 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/state"
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
@ -23,7 +24,8 @@ type PendingBlockEvent struct {
var statelogger = logger.NewLogger("BLOCK") var statelogger = logger.NewLogger("BLOCK")
type BlockProcessor struct { type BlockProcessor struct {
db ethutil.Database db ethutil.Database
extraDb ethutil.Database
// Mutex for locking the block processor. Blocks can only be handled one at a time // Mutex for locking the block processor. Blocks can only be handled one at a time
mutex sync.Mutex mutex sync.Mutex
// Canonical block chain // Canonical block chain
@ -45,9 +47,10 @@ type BlockProcessor struct {
eventMux *event.TypeMux eventMux *event.TypeMux
} }
func NewBlockProcessor(db ethutil.Database, pow pow.PoW, txpool *TxPool, chainManager *ChainManager, eventMux *event.TypeMux) *BlockProcessor { func NewBlockProcessor(db, extra ethutil.Database, pow pow.PoW, txpool *TxPool, chainManager *ChainManager, eventMux *event.TypeMux) *BlockProcessor {
sm := &BlockProcessor{ sm := &BlockProcessor{
db: db, db: db,
extraDb: extra,
mem: make(map[string]*big.Int), mem: make(map[string]*big.Int),
Pow: pow, Pow: pow,
bc: chainManager, bc: chainManager,
@ -230,6 +233,10 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (td *big
// Remove transactions from the pool // Remove transactions from the pool
sm.txpool.RemoveSet(block.Transactions()) sm.txpool.RemoveSet(block.Transactions())
for _, tx := range block.Transactions() {
putTx(sm.extraDb, tx)
}
chainlogger.Infof("processed block #%d (%x...)\n", header.Number, block.Hash()[0:4]) chainlogger.Infof("processed block #%d (%x...)\n", header.Number, block.Hash()[0:4])
return td, nil return td, nil
@ -347,3 +354,12 @@ func (sm *BlockProcessor) GetLogs(block *types.Block) (logs state.Logs, err erro
return state.Logs(), nil return state.Logs(), nil
} }
func putTx(db ethutil.Database, tx *types.Transaction) {
rlpEnc, err := rlp.EncodeToBytes(tx)
if err != nil {
statelogger.Infoln("Failed encoding tx", err)
return
}
db.Put(tx.Hash(), rlpEnc)
}

View File

@ -14,7 +14,7 @@ func proc() (*BlockProcessor, *ChainManager) {
var mux event.TypeMux var mux event.TypeMux
chainMan := NewChainManager(db, db, &mux) chainMan := NewChainManager(db, db, &mux)
return NewBlockProcessor(db, ezp.New(), nil, chainMan, &mux), chainMan return NewBlockProcessor(db, db, ezp.New(), nil, chainMan, &mux), chainMan
} }
func TestNumber(t *testing.T) { func TestNumber(t *testing.T) {

View File

@ -120,7 +120,7 @@ func newChainManager(block *types.Block, eventMux *event.TypeMux, db ethutil.Dat
// block processor with fake pow // block processor with fake pow
func newBlockProcessor(db ethutil.Database, txpool *TxPool, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor { func newBlockProcessor(db ethutil.Database, txpool *TxPool, cman *ChainManager, eventMux *event.TypeMux) *BlockProcessor {
bman := NewBlockProcessor(db, FakePow{}, txpool, newChainManager(nil, eventMux, db), eventMux) bman := NewBlockProcessor(db, db, FakePow{}, txpool, newChainManager(nil, eventMux, db), eventMux)
return bman return bman
} }

View File

@ -257,7 +257,7 @@ func TestChainInsertions(t *testing.T) {
var eventMux event.TypeMux var eventMux event.TypeMux
chainMan := NewChainManager(db, db, &eventMux) chainMan := NewChainManager(db, db, &eventMux)
txPool := NewTxPool(&eventMux) txPool := NewTxPool(&eventMux)
blockMan := NewBlockProcessor(db, nil, txPool, chainMan, &eventMux) blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux)
chainMan.SetProcessor(blockMan) chainMan.SetProcessor(blockMan)
const max = 2 const max = 2
@ -303,7 +303,7 @@ func TestChainMultipleInsertions(t *testing.T) {
var eventMux event.TypeMux var eventMux event.TypeMux
chainMan := NewChainManager(db, db, &eventMux) chainMan := NewChainManager(db, db, &eventMux)
txPool := NewTxPool(&eventMux) txPool := NewTxPool(&eventMux)
blockMan := NewBlockProcessor(db, nil, txPool, chainMan, &eventMux) blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux)
chainMan.SetProcessor(blockMan) chainMan.SetProcessor(blockMan)
done := make(chan bool, max) done := make(chan bool, max)
for i, chain := range chains { for i, chain := range chains {

View File

@ -107,9 +107,10 @@ type Ethereum struct {
// Channel for shutting down the ethereum // Channel for shutting down the ethereum
shutdownChan chan bool shutdownChan chan bool
// DB interface // DB interfaces
blockDb ethutil.Database blockDb ethutil.Database // Block chain database
stateDb ethutil.Database stateDb ethutil.Database // State changes database
extraDb ethutil.Database // Extra database (txs, etc)
//*** SERVICES *** //*** SERVICES ***
// State manager for processing new blocks and managing the over all states // State manager for processing new blocks and managing the over all states
@ -144,6 +145,7 @@ func New(config *Config) (*Ethereum, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
extraDb, err := ethdb.NewLDBDatabase(path.Join(config.DataDir, "extra"))
// Perform database sanity checks // Perform database sanity checks
d, _ := blockDb.Get([]byte("ProtocolVersion")) d, _ := blockDb.Get([]byte("ProtocolVersion"))
@ -152,14 +154,13 @@ func New(config *Config) (*Ethereum, error) {
path := path.Join(config.DataDir, "blockchain") path := path.Join(config.DataDir, "blockchain")
return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, ProtocolVersion, path) return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, ProtocolVersion, path)
} }
saveProtocolVersion(extraDb)
saveProtocolVersion(blockDb)
//ethutil.Config.Db = db
eth := &Ethereum{ eth := &Ethereum{
shutdownChan: make(chan bool), shutdownChan: make(chan bool),
blockDb: blockDb, blockDb: blockDb,
stateDb: stateDb, stateDb: stateDb,
extraDb: extraDb,
eventMux: &event.TypeMux{}, eventMux: &event.TypeMux{},
logger: servlogger, logger: servlogger,
accountManager: config.AccountManager, accountManager: config.AccountManager,
@ -169,7 +170,7 @@ func New(config *Config) (*Ethereum, error) {
eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
pow := ethash.New(eth.chainManager) pow := ethash.New(eth.chainManager)
eth.txPool = core.NewTxPool(eth.EventMux()) eth.txPool = core.NewTxPool(eth.EventMux())
eth.blockProcessor = core.NewBlockProcessor(stateDb, pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, pow, eth.txPool, eth.chainManager, eth.EventMux())
eth.chainManager.SetProcessor(eth.blockProcessor) eth.chainManager.SetProcessor(eth.blockProcessor)
eth.whisper = whisper.New() eth.whisper = whisper.New()
eth.miner = miner.New(eth, pow, config.MinerThreads) eth.miner = miner.New(eth, pow, config.MinerThreads)
@ -230,6 +231,7 @@ func (s *Ethereum) Whisper() *whisper.Whisper { return s.whisper }
func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux } func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
func (s *Ethereum) BlockDb() ethutil.Database { return s.blockDb } func (s *Ethereum) BlockDb() ethutil.Database { return s.blockDb }
func (s *Ethereum) StateDb() ethutil.Database { return s.stateDb } func (s *Ethereum) StateDb() ethutil.Database { return s.stateDb }
func (s *Ethereum) ExtraDb() ethutil.Database { return s.extraDb }
func (s *Ethereum) IsListening() bool { return true } // Always listening func (s *Ethereum) IsListening() bool { return true } // Always listening
func (s *Ethereum) PeerCount() int { return s.net.PeerCount() } func (s *Ethereum) PeerCount() int { return s.net.PeerCount() }
func (s *Ethereum) Peers() []*p2p.Peer { return s.net.Peers() } func (s *Ethereum) Peers() []*p2p.Peer { return s.net.Peers() }