Handle interrupts, resume loading

This commit is contained in:
Alexey Akhunov 2018-07-18 13:16:41 +01:00
parent 4ba51125bb
commit 6ab80b62c5
2 changed files with 65 additions and 44 deletions

47
main.go
View File

@ -6,12 +6,15 @@ package main
import ( import (
"bytes" "bytes"
"encoding/binary"
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"io" "io"
"os" "os"
"os/signal"
"runtime/pprof" "runtime/pprof"
"syscall"
"time" "time"
"github.com/cosmos/ethermint/core" "github.com/cosmos/ethermint/core"
@ -30,7 +33,6 @@ import (
var cpuprofile = flag.String("cpu-profile", "", "write cpu profile `file`") var cpuprofile = flag.String("cpu-profile", "", "write cpu profile `file`")
var blockchain = flag.String("blockchain", "data/blockchain", "file containing blocks to load") var blockchain = flag.String("blockchain", "data/blockchain", "file containing blocks to load")
var datadir = flag.String("datadir", "", "directory for ethermint data") var datadir = flag.String("datadir", "", "directory for ethermint data")
var blockStop = flag.Int("stop-block", 10000, "stop prematurely after processing a certain number of blocks")
var ( var (
// TODO: Document... // TODO: Document...
@ -57,6 +59,14 @@ func main() {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
sigs := make(chan os.Signal, 1)
interruptCh := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
interruptCh <- true
}()
stateDB := dbm.NewDB("state", dbm.LevelDBBackend, *datadir) stateDB := dbm.NewDB("state", dbm.LevelDBBackend, *datadir)
codeDB := dbm.NewDB("code", dbm.LevelDBBackend, *datadir) codeDB := dbm.NewDB("code", dbm.LevelDBBackend, *datadir)
@ -65,6 +75,8 @@ func main() {
panic(err) panic(err)
} }
// Only create genesis if it is a brand new database
if ethermintDB.LatestVersion() == 0 {
// start with empty root hash (i.e. empty state) // start with empty root hash (i.e. empty state)
gethStateDB, err := ethstate.New(ethcommon.Hash{}, ethermintDB) gethStateDB, err := ethstate.New(ethcommon.Hash{}, ethermintDB)
if err != nil { if err != nil {
@ -96,6 +108,7 @@ func main() {
fmt.Printf("commitID after genesis: %v\n", commitID) fmt.Printf("commitID after genesis: %v\n", commitID)
fmt.Printf("genesis state root hash: %x\n", genRoot[:]) fmt.Printf("genesis state root hash: %x\n", genRoot[:])
}
// file with blockchain data exported from geth by using "geth exportdb" // file with blockchain data exported from geth by using "geth exportdb"
// command. // command.
@ -120,14 +133,18 @@ func main() {
root501 ethcommon.Hash // root hash after block 501 root501 ethcommon.Hash // root hash after block 501
) )
prevRoot := genRoot var prevRoot ethcommon.Hash
binary.BigEndian.PutUint64(prevRoot[:8], uint64(ethermintDB.LatestVersion()))
ethermintDB.Tracing = true ethermintDB.Tracing = true
chainContext := core.NewChainContext() chainContext := core.NewChainContext()
vmConfig := ethvm.Config{} vmConfig := ethvm.Config{}
n := 0 n := 0
startTime := time.Now() startTime := time.Now()
for { interrupt := false
var lastSkipped uint64
for !interrupt {
if err = stream.Decode(&block); err == io.EOF { if err = stream.Decode(&block); err == io.EOF {
err = nil err = nil
break break
@ -135,10 +152,15 @@ func main() {
panic(fmt.Errorf("failed to decode at block %d: %s", block.NumberU64(), err)) panic(fmt.Errorf("failed to decode at block %d: %s", block.NumberU64(), err))
} }
// don't import first block // don't import blocks already imported
if block.NumberU64() == 0 { if block.NumberU64() < uint64(ethermintDB.LatestVersion()) {
lastSkipped = block.NumberU64()
continue continue
} }
if lastSkipped > 0 {
fmt.Printf("Skipped blocks up to %d\n", lastSkipped)
lastSkipped = 0
}
header := block.Header() header := block.Header()
chainContext.Coinbase = header.Coinbase chainContext.Coinbase = header.Coinbase
@ -225,9 +247,11 @@ func main() {
fmt.Printf("processed %d blocks, time so far: %v\n", n, time.Since(startTime)) fmt.Printf("processed %d blocks, time so far: %v\n", n, time.Since(startTime))
} }
if *blockStop == n { // Check for interrupts
fmt.Println("haulting process prematurely") select {
break case interrupt = <-interruptCh:
fmt.Printf("Interrupted, please wait for cleanup...\n")
default:
} }
} }
@ -235,13 +259,6 @@ func main() {
ethermintDB.Tracing = true ethermintDB.Tracing = true
genState, err := ethstate.New(genRoot, ethermintDB)
if err != nil {
panic(err)
}
fmt.Printf("balance of one of the genesis investors: %s\n", genState.GetBalance(genInvestor))
// try to create a new geth stateDB from root of the block 500 // try to create a new geth stateDB from root of the block 500
fmt.Printf("root500: %x\n", root500[:]) fmt.Printf("root500: %x\n", root500[:])

View File

@ -95,6 +95,10 @@ func NewDatabase(stateDB, codeDB dbm.DB) (*Database, error) {
return db, nil return db, nil
} }
func (db *Database) LatestVersion() int64 {
return db.stateStore.LastCommitID().Version
}
// OpenTrie implements Ethereum's state.Database interface. It returns a Trie // OpenTrie implements Ethereum's state.Database interface. It returns a Trie
// type which implements the Ethereum state.Trie interface. It us used for // type which implements the Ethereum state.Trie interface. It us used for
// storage of accounts. An error is returned if state cannot load for a // storage of accounts. An error is returned if state cannot load for a