Merge pull request #700 from bas-vk/issue_650

Added blockchain DB versioning support, closes #650
This commit is contained in:
Jeffrey Wilcke 2015-04-13 17:34:34 +02:00
commit 5f9346bc7a
6 changed files with 188 additions and 29 deletions

View File

@ -36,11 +36,13 @@ import (
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/peterh/liner" "github.com/peterh/liner"
"path"
) )
import _ "net/http/pprof" import _ "net/http/pprof"
@ -208,12 +210,18 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
Name: "export", Name: "export",
Usage: `export blockchain into file`, Usage: `export blockchain into file`,
}, },
{
Action: upgradeDb,
Name: "upgradedb",
Usage: "upgrade chainblock database",
},
} }
app.Flags = []cli.Flag{ app.Flags = []cli.Flag{
utils.UnlockedAccountFlag, utils.UnlockedAccountFlag,
utils.PasswordFileFlag, utils.PasswordFileFlag,
utils.BootnodesFlag, utils.BootnodesFlag,
utils.DataDirFlag, utils.DataDirFlag,
utils.BlockchainVersionFlag,
utils.JSpathFlag, utils.JSpathFlag,
utils.ListenPortFlag, utils.ListenPortFlag,
utils.MaxPeersFlag, utils.MaxPeersFlag,
@ -437,13 +445,29 @@ func importchain(ctx *cli.Context) {
if len(ctx.Args()) != 1 { if len(ctx.Args()) != 1 {
utils.Fatalf("This command requires an argument.") utils.Fatalf("This command requires an argument.")
} }
chainmgr, _, _ := utils.GetChain(ctx)
cfg := utils.MakeEthConfig(ClientIdentifier, Version, ctx)
cfg.SkipBcVersionCheck = true
ethereum, err := eth.New(cfg)
if err != nil {
utils.Fatalf("%v\n", err)
}
chainmgr := ethereum.ChainManager()
start := time.Now() start := time.Now()
err := utils.ImportChain(chainmgr, ctx.Args().First()) err = utils.ImportChain(chainmgr, ctx.Args().First())
if err != nil { if err != nil {
utils.Fatalf("Import error: %v\n", err) utils.Fatalf("Import error: %v\n", err)
} }
// force database flush
ethereum.BlockDb().Close()
ethereum.StateDb().Close()
ethereum.ExtraDb().Close()
fmt.Printf("Import done in %v", time.Since(start)) fmt.Printf("Import done in %v", time.Since(start))
return return
} }
@ -451,9 +475,18 @@ func exportchain(ctx *cli.Context) {
if len(ctx.Args()) != 1 { if len(ctx.Args()) != 1 {
utils.Fatalf("This command requires an argument.") utils.Fatalf("This command requires an argument.")
} }
chainmgr, _, _ := utils.GetChain(ctx)
cfg := utils.MakeEthConfig(ClientIdentifier, Version, ctx)
cfg.SkipBcVersionCheck = true
ethereum, err := eth.New(cfg)
if err != nil {
utils.Fatalf("%v\n", err)
}
chainmgr := ethereum.ChainManager()
start := time.Now() start := time.Now()
err := utils.ExportChain(chainmgr, ctx.Args().First()) err = utils.ExportChain(chainmgr, ctx.Args().First())
if err != nil { if err != nil {
utils.Fatalf("Export error: %v\n", err) utils.Fatalf("Export error: %v\n", err)
} }
@ -461,6 +494,60 @@ func exportchain(ctx *cli.Context) {
return return
} }
func upgradeDb(ctx *cli.Context) {
fmt.Println("Upgrade blockchain DB")
cfg := utils.MakeEthConfig(ClientIdentifier, Version, ctx)
cfg.SkipBcVersionCheck = true
ethereum, err := eth.New(cfg)
if err != nil {
utils.Fatalf("%v\n", err)
}
v, _ := ethereum.BlockDb().Get([]byte("BlockchainVersion"))
bcVersion := int(common.NewValue(v).Uint())
if bcVersion == 0 {
bcVersion = core.BlockChainVersion
}
filename := fmt.Sprintf("blockchain_%d_%s.chain", bcVersion, time.Now().Format("2006-01-02_15:04:05"))
exportFile := path.Join(ctx.GlobalString(utils.DataDirFlag.Name), filename)
err = utils.ExportChain(ethereum.ChainManager(), exportFile)
if err != nil {
utils.Fatalf("Unable to export chain for reimport %s\n", err)
}
ethereum.BlockDb().Close()
ethereum.StateDb().Close()
ethereum.ExtraDb().Close()
os.RemoveAll(path.Join(ctx.GlobalString(utils.DataDirFlag.Name), "blockchain"))
ethereum, err = eth.New(cfg)
if err != nil {
utils.Fatalf("%v\n", err)
}
ethereum.BlockDb().Put([]byte("BlockchainVersion"), common.NewValue(core.BlockChainVersion).Bytes())
err = utils.ImportChain(ethereum.ChainManager(), exportFile)
if err != nil {
utils.Fatalf("Import error %v (a backup is made in %s, use the import command to import it)\n", err, exportFile)
}
// force database flush
ethereum.BlockDb().Close()
ethereum.StateDb().Close()
ethereum.ExtraDb().Close()
os.Remove(exportFile)
fmt.Println("Import finished")
}
func dump(ctx *cli.Context) { func dump(ctx *cli.Context) {
chainmgr, _, stateDb := utils.GetChain(ctx) chainmgr, _, stateDb := utils.GetChain(ctx)
for _, arg := range ctx.Args() { for _, arg := range ctx.Args() {

View File

@ -155,7 +155,11 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error {
chainmgr.Reset() chainmgr.Reset()
stream := rlp.NewStream(fh) stream := rlp.NewStream(fh)
var i int var i, n int
batchSize := 2500
blocks := make(types.Blocks, batchSize)
for ; ; i++ { for ; ; i++ {
var b types.Block var b types.Block
if err := stream.Decode(&b); err == io.EOF { if err := stream.Decode(&b); err == io.EOF {
@ -163,10 +167,25 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error {
} else if err != nil { } else if err != nil {
return fmt.Errorf("at block %d: %v", i, err) return fmt.Errorf("at block %d: %v", i, err)
} }
if err := chainmgr.InsertChain(types.Blocks{&b}); err != nil {
return fmt.Errorf("invalid block %d: %v", i, err) blocks[n] = &b
n++
if n == batchSize {
if err := chainmgr.InsertChain(blocks); err != nil {
return fmt.Errorf("invalid block %v", err)
}
n = 0
blocks = make(types.Blocks, batchSize)
} }
} }
if n > 0 {
if err := chainmgr.InsertChain(blocks[:n]); err != nil {
return fmt.Errorf("invalid block %v", err)
}
}
fmt.Printf("imported %d blocks\n", i) fmt.Printf("imported %d blocks\n", i)
return nil return nil
} }

View File

@ -7,6 +7,7 @@ import (
"runtime" "runtime"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -83,6 +84,11 @@ var (
Usage: "Network Id", Usage: "Network Id",
Value: eth.NetworkId, Value: eth.NetworkId,
} }
BlockchainVersionFlag = cli.IntFlag{
Name: "blockchainversion",
Usage: "Blockchain version",
Value: core.BlockChainVersion,
}
// miner settings // miner settings
MinerThreadsFlag = cli.IntFlag{ MinerThreadsFlag = cli.IntFlag{
@ -237,29 +243,32 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config {
glog.SetLogDir(ctx.GlobalString(LogFileFlag.Name)) glog.SetLogDir(ctx.GlobalString(LogFileFlag.Name))
return &eth.Config{ return &eth.Config{
Name: common.MakeName(clientID, version), Name: common.MakeName(clientID, version),
DataDir: ctx.GlobalString(DataDirFlag.Name), DataDir: ctx.GlobalString(DataDirFlag.Name),
ProtocolVersion: ctx.GlobalInt(ProtocolVersionFlag.Name), ProtocolVersion: ctx.GlobalInt(ProtocolVersionFlag.Name),
NetworkId: ctx.GlobalInt(NetworkIdFlag.Name), BlockChainVersion: ctx.GlobalInt(BlockchainVersionFlag.Name),
LogFile: ctx.GlobalString(LogFileFlag.Name), SkipBcVersionCheck: false,
LogLevel: ctx.GlobalInt(LogLevelFlag.Name), NetworkId: ctx.GlobalInt(NetworkIdFlag.Name),
LogJSON: ctx.GlobalString(LogJSONFlag.Name), LogFile: ctx.GlobalString(LogFileFlag.Name),
Etherbase: ctx.GlobalString(EtherbaseFlag.Name), LogLevel: ctx.GlobalInt(LogLevelFlag.Name),
MinerThreads: ctx.GlobalInt(MinerThreadsFlag.Name), LogJSON: ctx.GlobalString(LogJSONFlag.Name),
AccountManager: GetAccountManager(ctx), Etherbase: ctx.GlobalString(EtherbaseFlag.Name),
VmDebug: ctx.GlobalBool(VMDebugFlag.Name), MinerThreads: ctx.GlobalInt(MinerThreadsFlag.Name),
MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name), AccountManager: GetAccountManager(ctx),
Port: ctx.GlobalString(ListenPortFlag.Name), VmDebug: ctx.GlobalBool(VMDebugFlag.Name),
NAT: GetNAT(ctx), MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name),
NodeKey: GetNodeKey(ctx), Port: ctx.GlobalString(ListenPortFlag.Name),
Shh: true, NAT: GetNAT(ctx),
Dial: true, NodeKey: GetNodeKey(ctx),
BootNodes: ctx.GlobalString(BootnodesFlag.Name), Shh: true,
Dial: true,
BootNodes: ctx.GlobalString(BootnodesFlag.Name),
} }
} }
func GetChain(ctx *cli.Context) (*core.ChainManager, common.Database, common.Database) { func GetChain(ctx *cli.Context) (*core.ChainManager, common.Database, common.Database) {
dataDir := ctx.GlobalString(DataDirFlag.Name) dataDir := ctx.GlobalString(DataDirFlag.Name)
blockDb, err := ethdb.NewLDBDatabase(path.Join(dataDir, "blockchain")) blockDb, err := ethdb.NewLDBDatabase(path.Join(dataDir, "blockchain"))
if err != nil { if err != nil {
Fatalf("Could not open database: %v", err) Fatalf("Could not open database: %v", err)
@ -269,7 +278,20 @@ func GetChain(ctx *cli.Context) (*core.ChainManager, common.Database, common.Dat
if err != nil { if err != nil {
Fatalf("Could not open database: %v", err) Fatalf("Could not open database: %v", err)
} }
return core.NewChainManager(blockDb, stateDb, new(event.TypeMux)), blockDb, stateDb
extraDb, err := ethdb.NewLDBDatabase(path.Join(dataDir, "extra"))
if err != nil {
Fatalf("Could not open database: %v", err)
}
eventMux := new(event.TypeMux)
chainManager := core.NewChainManager(blockDb, stateDb, eventMux)
pow := ethash.New(chainManager)
txPool := core.NewTxPool(eventMux, chainManager.State)
blockProcessor := core.NewBlockProcessor(stateDb, extraDb, pow, txPool, chainManager, eventMux)
chainManager.SetProcessor(blockProcessor)
return chainManager, blockDb, stateDb
} }
func GetAccountManager(ctx *cli.Context) *accounts.Manager { func GetAccountManager(ctx *cli.Context) *accounts.Manager {

View File

@ -18,6 +18,12 @@ import (
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
const (
// must be bumped when consensus algorithm is changed, this forces the upgradedb
// command to be run (forces the blocks to be imported again using the new algorithm)
BlockChainVersion = 1
)
var statelogger = logger.NewLogger("BLOCK") var statelogger = logger.NewLogger("BLOCK")
type BlockProcessor struct { type BlockProcessor struct {

View File

@ -284,11 +284,14 @@ func (self *ChainManager) Export(w io.Writer) error {
defer self.mu.RUnlock() defer self.mu.RUnlock()
glog.V(logger.Info).Infof("exporting %v blocks...\n", self.currentBlock.Header().Number) glog.V(logger.Info).Infof("exporting %v blocks...\n", self.currentBlock.Header().Number)
for block := self.currentBlock; block != nil; block = self.GetBlock(block.Header().ParentHash) { last := self.currentBlock.NumberU64()
if err := block.EncodeRLP(w); err != nil {
for nr := uint64(0); nr <= last; nr++ {
if err := self.GetBlockByNumber(nr).EncodeRLP(w); err != nil {
return err return err
} }
} }
return nil return nil
} }

View File

@ -42,6 +42,9 @@ type Config struct {
ProtocolVersion int ProtocolVersion int
NetworkId int NetworkId int
BlockChainVersion int
SkipBcVersionCheck bool // e.g. blockchain export
DataDir string DataDir string
LogFile string LogFile string
LogLevel int LogLevel int
@ -149,7 +152,7 @@ type Ethereum struct {
} }
func New(config *Config) (*Ethereum, error) { func New(config *Config) (*Ethereum, error) {
// Boostrap database // Bootstrap database
logger.New(config.DataDir, config.LogFile, config.LogLevel) logger.New(config.DataDir, config.LogFile, config.LogLevel)
if len(config.LogJSON) > 0 { if len(config.LogJSON) > 0 {
logger.NewJSONsystem(config.DataDir, config.LogJSON) logger.NewJSONsystem(config.DataDir, config.LogJSON)
@ -179,6 +182,16 @@ func New(config *Config) (*Ethereum, error) {
saveProtocolVersion(blockDb, config.ProtocolVersion) saveProtocolVersion(blockDb, config.ProtocolVersion)
glog.V(logger.Info).Infof("Protocol Version: %v, Network Id: %v", config.ProtocolVersion, config.NetworkId) glog.V(logger.Info).Infof("Protocol Version: %v, Network Id: %v", config.ProtocolVersion, config.NetworkId)
if !config.SkipBcVersionCheck {
b, _ := blockDb.Get([]byte("BlockchainVersion"))
bcVersion := int(common.NewValue(b).Uint())
if bcVersion != config.BlockChainVersion && bcVersion != 0 {
return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, config.BlockChainVersion)
}
saveBlockchainVersion(blockDb, config.BlockChainVersion)
}
glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion)
eth := &Ethereum{ eth := &Ethereum{
shutdownChan: make(chan bool), shutdownChan: make(chan bool),
blockDb: blockDb, blockDb: blockDb,
@ -472,3 +485,12 @@ func saveProtocolVersion(db common.Database, protov int) {
db.Put([]byte("ProtocolVersion"), common.NewValue(protov).Bytes()) db.Put([]byte("ProtocolVersion"), common.NewValue(protov).Bytes())
} }
} }
func saveBlockchainVersion(db common.Database, bcVersion int) {
d, _ := db.Get([]byte("BlockchainVersion"))
blockchainVersion := common.NewValue(d).Uint()
if blockchainVersion == 0 {
db.Put([]byte("BlockchainVersion"), common.NewValue(bcVersion).Bytes())
}
}