From 3616080db46931202003157bacf10748008bebc0 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 21 Jan 2014 23:27:08 +0100 Subject: [PATCH] Added synchronisation of transactions across remote pools --- dev_console.go | 16 +++++++--- ethereum.go | 77 ++++++++++++++++++++++++++++----------------- peer.go | 6 ++-- server.go | 34 +++++++++++--------- transaction_pool.go | 26 ++++++++++++--- 5 files changed, 103 insertions(+), 56 deletions(-) diff --git a/dev_console.go b/dev_console.go index 5340a5f46..d14f019e5 100644 --- a/dev_console.go +++ b/dev_console.go @@ -12,15 +12,16 @@ import ( ) type Console struct { - db *ethdb.MemDatabase - trie *ethutil.Trie + db *ethdb.MemDatabase + trie *ethutil.Trie + server *Server } -func NewConsole() *Console { +func NewConsole(s *Server) *Console { db, _ := ethdb.NewMemDatabase() trie := ethutil.NewTrie(db, "") - return &Console{db: db, trie: trie} + return &Console{db: db, trie: trie, server: s} } func (i *Console) ValidateInput(action string, argumentLength int) error { @@ -43,6 +44,9 @@ func (i *Console) ValidateInput(action string, argumentLength int) error { case action == "encode" && argumentLength != 1: err = true expArgCount = 1 + case action == "tx" && argumentLength != 2: + err = true + expArgCount = 2 } if err { @@ -105,6 +109,10 @@ func (i *Console) ParseInput(input string) bool { fmt.Printf("%q\n", d) case "encode": fmt.Printf("%q\n", ethutil.Encode(tokens[1])) + case "tx": + tx := ethutil.NewTransaction(tokens[1], ethutil.Big(tokens[2]), []string{""}) + + i.server.txPool.QueueTransaction(tx) case "exit", "quit", "q": return false case "help": diff --git a/ethereum.go b/ethereum.go index d74cb4ff2..b7f059a02 100644 --- a/ethereum.go +++ b/ethereum.go @@ -7,6 +7,7 @@ import ( "log" "os" "os/signal" + "path" "runtime" ) @@ -44,36 +45,54 @@ func main() { Init() - if StartConsole { - console := NewConsole() - console.Start() - } else { - log.Println("Starting Ethereum") - server, err := NewServer() + ethutil.ReadConfig() - if err != nil { - log.Println(err) - return - } + server, err := NewServer() - RegisterInterupts(server) - - if StartMining { - log.Println("Mining started") - dagger := &Dagger{} - - go func() { - for { - res := dagger.Search(ethutil.Big("01001"), ethutil.BigPow(2, 36)) - log.Println("Res dagger", res) - //server.Broadcast("blockmine", ethutil.Encode(res.String())) - } - }() - } - - server.Start() - - // Wait for shutdown - server.WaitForShutdown() + if err != nil { + log.Println(err) + return } + + if StartConsole { + err := os.Mkdir(ethutil.Config.ExecPath, os.ModePerm) + // Error is OK if the error is ErrExist + if err != nil && !os.IsExist(err) { + log.Panic("Unable to create EXECPATH. Exiting") + } + + // TODO The logger will eventually be a non blocking logger. Logging is a expensive task + // Log to file only + file, err := os.OpenFile(path.Join(ethutil.Config.ExecPath, "debug.log"), os.O_RDWR|os.O_CREATE, os.ModePerm) + if err != nil { + log.Panic("Unable to set proper logger", err) + } + + ethutil.Config.Log = log.New(file, "", 0) + + console := NewConsole(server) + go console.Start() + } + + log.Println("Starting Ethereum") + + RegisterInterupts(server) + + if StartMining { + log.Println("Mining started") + dagger := &Dagger{} + + go func() { + for { + res := dagger.Search(ethutil.Big("01001"), ethutil.BigPow(2, 36)) + log.Println("Res dagger", res) + //server.Broadcast("blockmine", ethutil.Encode(res.String())) + } + }() + } + + server.Start() + + // Wait for shutdown + server.WaitForShutdown() } diff --git a/peer.go b/peer.go index 0f3422826..207f9e59f 100644 --- a/peer.go +++ b/peer.go @@ -62,7 +62,7 @@ func NewOutboundPeer(addr string, server *Server) *Peer { server: server, inbound: false, connected: 0, - disconnect: 1, + disconnect: 0, } // Set up the connection in another goroutine so we don't block the main thread @@ -169,12 +169,12 @@ out: // Version message p.handleHandshake(msg) case ethwire.MsgBlockTy: - err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(ethutil.Encode(msg.Data))) + err := p.server.blockManager.ProcessBlock(ethutil.NewBlock(msg.Data)) if err != nil { log.Println(err) } case ethwire.MsgTxTy: - p.server.txPool.QueueTransaction(ethutil.NewTransactionFromData(ethutil.Encode(msg.Data))) + p.server.txPool.QueueTransaction(ethutil.NewTransactionFromData(msg.Data)) case ethwire.MsgInvTy: case ethwire.MsgGetPeersTy: p.requestedPeerList = true diff --git a/server.go b/server.go index 3a35a43a2..2927f023a 100644 --- a/server.go +++ b/server.go @@ -48,7 +48,7 @@ func NewServer() (*Server, error) { return nil, err } - ethutil.SetConfig(db) + ethutil.Config.Db = db nonce, _ := ethutil.RandomUint64() server := &Server{ @@ -152,28 +152,30 @@ func (s *Server) Start() { s.Stop() } - - return } else { log.Fatal(err) } + } else { + // Starting accepting connections + go func() { + for { + conn, err := ln.Accept() + if err != nil { + log.Println(err) + + continue + } + + go s.AddPeer(conn) + } + }() } // Start the reaping processes go s.ReapDeadPeers() - go func() { - for { - conn, err := ln.Accept() - if err != nil { - log.Println(err) - - continue - } - - go s.AddPeer(conn) - } - }() + // Start the tx pool + s.txPool.Start() // TMP /* @@ -196,6 +198,8 @@ func (s *Server) Stop() { }) s.shutdownChan <- true + + s.txPool.Stop() } // This function will wait for a shutdown and resumes main thread execution diff --git a/transaction_pool.go b/transaction_pool.go index f645afd06..b302931de 100644 --- a/transaction_pool.go +++ b/transaction_pool.go @@ -1,9 +1,11 @@ package main import ( + "bytes" "container/list" "errors" "github.com/ethereum/ethutil-go" + "github.com/ethereum/ethwire-go" "log" "math/big" "sync" @@ -56,9 +58,11 @@ func NewTxPool(s *Server) *TxPool { // Blocking function. Don't use directly. Use QueueTransaction instead func (pool *TxPool) addTransaction(tx *ethutil.Transaction) { pool.mutex.Lock() - defer pool.mutex.Unlock() - pool.pool.PushBack(tx) + pool.mutex.Unlock() + + // Broadcast the transaction to the rest of the peers + pool.server.Broadcast(ethwire.MsgTxTy, tx.RlpEncode()) } // Process transaction validates the Tx and processes funds from the @@ -89,7 +93,12 @@ func (pool *TxPool) processTransaction(tx *ethutil.Transaction) error { // Make sure there's enough in the sender's account. Having insufficient // funds won't invalidate this transaction but simple ignores it. if sender.Amount.Cmp(tx.Value) < 0 { - return errors.New("Insufficient amount in sender's account") + if Debug { + log.Println("Insufficient amount in sender's account. Adding 1 ETH for debug") + sender.Amount = ethutil.BigPow(10, 18) + } else { + return errors.New("Insufficient amount in sender's account") + } } // Subtract the amount from the senders account @@ -121,6 +130,15 @@ out: for { select { case tx := <-pool.queueChan: + hash := tx.Hash() + foundTx := FindTx(pool.pool, func(tx *ethutil.Transaction, e *list.Element) bool { + return bytes.Compare(tx.Hash(), hash) == 0 + }) + + if foundTx != nil { + break + } + // Process the transaction err := pool.processTransaction(tx) if err != nil { @@ -144,8 +162,6 @@ func (pool *TxPool) Flush() { pool.mutex.Lock() defer pool.mutex.Unlock() - - pool.mutex.Unlock() } func (pool *TxPool) Start() {