From f0998415ba9a73f0add32f9b5aed2aec98b9a7f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 8 Aug 2018 12:15:08 +0300 Subject: [PATCH] cmd, consensus/ethash, eth: miner push notifications --- cmd/geth/main.go | 3 +- cmd/geth/usage.go | 1 + cmd/utils/flags.go | 14 +++- consensus/ethash/algorithm_test.go | 2 +- consensus/ethash/consensus.go | 2 +- consensus/ethash/ethash.go | 18 +++-- consensus/ethash/ethash_test.go | 43 +++++------ consensus/ethash/sealer.go | 90 ++++++++++++++-------- consensus/ethash/sealer_test.go | 115 +++++++++++++++++++++++++++++ eth/backend.go | 8 +- eth/config.go | 1 + les/backend.go | 2 +- 12 files changed, 226 insertions(+), 73 deletions(-) create mode 100644 consensus/ethash/sealer_test.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 77ef6afe2..d556ad92c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -98,8 +98,9 @@ var ( utils.MaxPendingPeersFlag, utils.EtherbaseFlag, utils.GasPriceFlag, - utils.MinerThreadsFlag, utils.MiningEnabledFlag, + utils.MinerThreadsFlag, + utils.MinerNotifyFlag, utils.TargetGasLimitFlag, utils.NATFlag, utils.NoDiscoverFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 6a12a66cc..9d63c68f7 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -185,6 +185,7 @@ var AppHelpFlagGroups = []flagGroup{ Flags: []cli.Flag{ utils.MiningEnabledFlag, utils.MinerThreadsFlag, + utils.MinerNotifyFlag, utils.EtherbaseFlag, utils.TargetGasLimitFlag, utils.GasPriceFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 522ad06b6..d6142f246 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -24,7 +24,6 @@ import ( "math/big" "os" "path/filepath" - "runtime" "strconv" "strings" "time" @@ -318,9 +317,13 @@ var ( Usage: "Enable mining", } MinerThreadsFlag = cli.IntFlag{ - Name: "minerthreads", + Name: "miner.threads", Usage: "Number of CPU threads to use for mining", - Value: runtime.NumCPU(), + Value: 0, + } + MinerNotifyFlag = cli.StringFlag{ + Name: "miner.notify", + Usage: "Comma separated HTTP URL list to notify of new work packages", } TargetGasLimitFlag = cli.Uint64Flag{ Name: "targetgaslimit", @@ -1093,6 +1096,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { if ctx.GlobalIsSet(MinerThreadsFlag.Name) { cfg.MinerThreads = ctx.GlobalInt(MinerThreadsFlag.Name) } + if ctx.GlobalIsSet(MinerNotifyFlag.Name) { + cfg.MinerNotify = strings.Split(ctx.GlobalString(MinerNotifyFlag.Name), ",") + } if ctx.GlobalIsSet(DocRootFlag.Name) { cfg.DocRoot = ctx.GlobalString(DocRootFlag.Name) } @@ -1293,7 +1299,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai DatasetDir: stack.ResolvePath(eth.DefaultConfig.Ethash.DatasetDir), DatasetsInMem: eth.DefaultConfig.Ethash.DatasetsInMem, DatasetsOnDisk: eth.DefaultConfig.Ethash.DatasetsOnDisk, - }) + }, nil) } } if gcmode := ctx.GlobalString(GCModeFlag.Name); gcmode != "full" && gcmode != "archive" { diff --git a/consensus/ethash/algorithm_test.go b/consensus/ethash/algorithm_test.go index e7625f7c0..db22cccd0 100644 --- a/consensus/ethash/algorithm_test.go +++ b/consensus/ethash/algorithm_test.go @@ -729,7 +729,7 @@ func TestConcurrentDiskCacheGeneration(t *testing.T) { go func(idx int) { defer pend.Done() - ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}) + ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}, nil) defer ethash.Close() if err := ethash.VerifySeal(nil, block.Header()); err != nil { t.Errorf("proc %d: block verification failed: %v", idx, err) diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index eb0f73d98..e18a06d52 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -493,7 +493,7 @@ func (ethash *Ethash) VerifySeal(chain consensus.ChainReader, header *types.Head if !bytes.Equal(header.MixDigest[:], digest) { return errInvalidMixDigest } - target := new(big.Int).Div(maxUint256, header.Difficulty) + target := new(big.Int).Div(two256, header.Difficulty) if new(big.Int).SetBytes(result).Cmp(target) > 0 { return errInvalidPoW } diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 0cb3059b9..19c94deb6 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -45,11 +45,11 @@ import ( var ErrInvalidDumpMagic = errors.New("invalid dump magic") var ( - // maxUint256 is a big integer representing 2^256-1 - maxUint256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0)) + // two256 is a big integer representing 2^256 + two256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0)) // sharedEthash is a full instance that can be shared between multiple users. - sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal}) + sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal}, nil) // algorithmRevision is the data structure version used for file naming. algorithmRevision = 23 @@ -447,8 +447,10 @@ type Ethash struct { exitCh chan chan error // Notification channel to exiting backend threads } -// New creates a full sized ethash PoW scheme and starts a background thread for remote mining. -func New(config Config) *Ethash { +// New creates a full sized ethash PoW scheme and starts a background thread for +// remote mining, also optionally notifying a batch of remote services of new work +// packages. +func New(config Config, notify []string) *Ethash { if config.CachesInMem <= 0 { log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem) config.CachesInMem = 1 @@ -473,13 +475,13 @@ func New(config Config) *Ethash { submitRateCh: make(chan *hashrate), exitCh: make(chan chan error), } - go ethash.remote() + go ethash.remote(notify) return ethash } // NewTester creates a small sized ethash PoW scheme useful only for testing // purposes. -func NewTester() *Ethash { +func NewTester(notify []string) *Ethash { ethash := &Ethash{ config: Config{PowMode: ModeTest}, caches: newlru("cache", 1, newCache), @@ -494,7 +496,7 @@ func NewTester() *Ethash { submitRateCh: make(chan *hashrate), exitCh: make(chan chan error), } - go ethash.remote() + go ethash.remote(notify) return ethash } diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index ccdd30fb0..87ac17c2b 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -32,17 +32,18 @@ import ( // Tests that ethash works correctly in test mode. func TestTestMode(t *testing.T) { - head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} - ethash := NewTester() + ethash := NewTester(nil) defer ethash.Close() - block, err := ethash.Seal(nil, types.NewBlockWithHeader(head), nil) + + block, err := ethash.Seal(nil, types.NewBlockWithHeader(header), nil) if err != nil { t.Fatalf("failed to seal block: %v", err) } - head.Nonce = types.EncodeNonce(block.Nonce()) - head.MixDigest = block.MixDigest() - if err := ethash.VerifySeal(nil, head); err != nil { + header.Nonce = types.EncodeNonce(block.Nonce()) + header.MixDigest = block.MixDigest() + if err := ethash.VerifySeal(nil, header); err != nil { t.Fatalf("unexpected verification error: %v", err) } } @@ -55,7 +56,7 @@ func TestCacheFileEvict(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(tmpdir) - e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}) + e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}, nil) defer e.Close() workers := 8 @@ -78,21 +79,21 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) { if block < 0 { block = 0 } - head := &types.Header{Number: big.NewInt(block), Difficulty: big.NewInt(100)} - e.VerifySeal(nil, head) + header := &types.Header{Number: big.NewInt(block), Difficulty: big.NewInt(100)} + e.VerifySeal(nil, header) } } func TestRemoteSealer(t *testing.T) { - ethash := NewTester() + ethash := NewTester(nil) defer ethash.Close() + api := &API{ethash} if _, err := api.GetWork(); err != errNoMiningWork { t.Error("expect to return an error indicate there is no mining work") } - - head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} - block := types.NewBlockWithHeader(head) + header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(header) // Push new work. ethash.Seal(nil, block, nil) @@ -108,16 +109,14 @@ func TestRemoteSealer(t *testing.T) { if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { t.Error("expect to return false when submit a fake solution") } - // Push new block with same block number to replace the original one. - head = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} - block = types.NewBlockWithHeader(head) + header = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} + block = types.NewBlockWithHeader(header) ethash.Seal(nil, block, nil) if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { t.Error("expect to return the latest pushed work") } - // Push block with higher block number. newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)} newBlock := types.NewBlockWithHeader(newHead) @@ -130,19 +129,18 @@ func TestRemoteSealer(t *testing.T) { func TestHashRate(t *testing.T) { var ( - ethash = NewTester() - api = &API{ethash} hashrate = []hexutil.Uint64{100, 200, 300} expect uint64 ids = []common.Hash{common.HexToHash("a"), common.HexToHash("b"), common.HexToHash("c")} ) - + ethash := NewTester(nil) defer ethash.Close() if tot := ethash.Hashrate(); tot != 0 { t.Error("expect the result should be zero") } + api := &API{ethash} for i := 0; i < len(hashrate); i += 1 { if res := api.SubmitHashRate(hashrate[i], ids[i]); !res { t.Error("remote miner submit hashrate failed") @@ -155,9 +153,8 @@ func TestHashRate(t *testing.T) { } func TestClosedRemoteSealer(t *testing.T) { - ethash := NewTester() - // Make sure exit channel has been listened - time.Sleep(1 * time.Second) + ethash := NewTester(nil) + time.Sleep(1 * time.Second) // ensure exit channel is listening ethash.Close() api := &API{ethash} diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index a9449d406..03d848473 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -17,11 +17,14 @@ package ethash import ( + "bytes" crand "crypto/rand" + "encoding/json" "errors" "math" "math/big" "math/rand" + "net/http" "runtime" "sync" "time" @@ -109,7 +112,7 @@ func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan s var ( header = block.Header() hash = header.HashNoNonce().Bytes() - target = new(big.Int).Div(maxUint256, header.Difficulty) + target = new(big.Int).Div(two256, header.Difficulty) number = header.Number.Uint64() dataset = ethash.dataset(number) ) @@ -161,40 +164,65 @@ search: runtime.KeepAlive(dataset) } -// remote starts a standalone goroutine to handle remote mining related stuff. -func (ethash *Ethash) remote() { +// remote is a standalone goroutine to handle remote mining related stuff. +func (ethash *Ethash) remote(notify []string) { var ( - works = make(map[common.Hash]*types.Block) - rates = make(map[common.Hash]hashrate) - currentWork *types.Block - ) + works = make(map[common.Hash]*types.Block) + rates = make(map[common.Hash]hashrate) - // getWork returns a work package for external miner. + currentBlock *types.Block + currentWork [3]string + + notifyTransport = &http.Transport{} + notifyClient = &http.Client{ + Transport: notifyTransport, + Timeout: time.Second, + } + notifyReqs = make([]*http.Request, len(notify)) + ) + // notifyWork notifies all the specified mining endpoints of the availability of + // new work to be processed. + notifyWork := func() { + work := currentWork + blob, _ := json.Marshal(work) + + for i, url := range notify { + // Terminate any previously pending request and create the new work + if notifyReqs[i] != nil { + notifyTransport.CancelRequest(notifyReqs[i]) + } + notifyReqs[i], _ = http.NewRequest("POST", url, bytes.NewReader(blob)) + notifyReqs[i].Header.Set("Content-Type", "application/json") + + // Push the new work concurrently to all the remote nodes + go func(req *http.Request, url string) { + res, err := notifyClient.Do(req) + if err != nil { + log.Warn("Failed to notify remote miner", "err", err) + } else { + log.Trace("Notified remote miner", "miner", url, "hash", log.Lazy{Fn: func() common.Hash { return common.HexToHash(work[0]) }}, "target", work[2]) + res.Body.Close() + } + }(notifyReqs[i], url) + } + } + // makeWork creates a work package for external miner. // // The work package consists of 3 strings: // result[0], 32 bytes hex encoded current block header pow-hash // result[1], 32 bytes hex encoded seed hash used for DAG // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty - getWork := func() ([3]string, error) { - var res [3]string - if currentWork == nil { - return res, errNoMiningWork - } - res[0] = currentWork.HashNoNonce().Hex() - res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex() + makeWork := func(block *types.Block) { + hash := block.HashNoNonce() - // Calculate the "target" to be returned to the external sealer. - n := big.NewInt(1) - n.Lsh(n, 255) - n.Div(n, currentWork.Difficulty()) - n.Lsh(n, 1) - res[2] = common.BytesToHash(n.Bytes()).Hex() + currentWork[0] = hash.Hex() + currentWork[1] = common.BytesToHash(SeedHash(block.NumberU64())).Hex() + currentWork[2] = common.BytesToHash(new(big.Int).Div(two256, block.Difficulty()).Bytes()).Hex() // Trace the seal work fetched by remote sealer. - works[currentWork.HashNoNonce()] = currentWork - return res, nil + currentBlock = block + works[hash] = block } - // submitWork verifies the submitted pow solution, returning // whether the solution was accepted or not (not can be both a bad pow as well as // any other error, like no pending work or stale mining result). @@ -238,21 +266,23 @@ func (ethash *Ethash) remote() { for { select { case block := <-ethash.workCh: - if currentWork != nil && block.ParentHash() != currentWork.ParentHash() { + if currentBlock != nil && block.ParentHash() != currentBlock.ParentHash() { // Start new round mining, throw out all previous work. works = make(map[common.Hash]*types.Block) } // Update current work with new received block. // Note same work can be past twice, happens when changing CPU threads. - currentWork = block + makeWork(block) + + // Notify and requested URLs of the new work availability + notifyWork() case work := <-ethash.fetchWorkCh: // Return current mining work to remote miner. - miningWork, err := getWork() - if err != nil { - work.errc <- err + if currentBlock == nil { + work.errc <- errNoMiningWork } else { - work.res <- miningWork + work.res <- currentWork } case result := <-ethash.submitWorkCh: diff --git a/consensus/ethash/sealer_test.go b/consensus/ethash/sealer_test.go new file mode 100644 index 000000000..6d8a77049 --- /dev/null +++ b/consensus/ethash/sealer_test.go @@ -0,0 +1,115 @@ +package ethash + +import ( + "encoding/json" + "io/ioutil" + "math/big" + "net" + "net/http" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// Tests whether remote HTTP servers are correctly notified of new work. +func TestRemoteNotify(t *testing.T) { + // Start a simple webserver to capture notifications + sink := make(chan [3]string) + + server := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + blob, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Fatalf("failed to read miner notification: %v", err) + } + var work [3]string + if err := json.Unmarshal(blob, &work); err != nil { + t.Fatalf("failed to unmarshal miner notification: %v", err) + } + sink <- work + }), + } + // Open a custom listener to extract its local address + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("failed to open notification server: %v", err) + } + defer listener.Close() + + go server.Serve(listener) + + // Create the custom ethash engine + ethash := NewTester([]string{"http://" + listener.Addr().String()}) + defer ethash.Close() + + // Stream a work task and ensure the notification bubbles out + header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(header) + + ethash.Seal(nil, block, nil) + select { + case work := <-sink: + if want := header.HashNoNonce().Hex(); work[0] != want { + t.Errorf("work packet hash mismatch: have %s, want %s", work[0], want) + } + if want := common.BytesToHash(SeedHash(header.Number.Uint64())).Hex(); work[1] != want { + t.Errorf("work packet seed mismatch: have %s, want %s", work[1], want) + } + target := new(big.Int).Div(new(big.Int).Lsh(big.NewInt(1), 256), header.Difficulty) + if want := common.BytesToHash(target.Bytes()).Hex(); work[2] != want { + t.Errorf("work packet target mismatch: have %s, want %s", work[2], want) + } + case <-time.After(time.Second): + t.Fatalf("notification timed out") + } +} + +// Tests that pushing work packages fast to the miner doesn't cause any daa race +// issues in the notifications. +func TestRemoteMultiNotify(t *testing.T) { + // Start a simple webserver to capture notifications + sink := make(chan [3]string, 1024) + + server := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + blob, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Fatalf("failed to read miner notification: %v", err) + } + var work [3]string + if err := json.Unmarshal(blob, &work); err != nil { + t.Fatalf("failed to unmarshal miner notification: %v", err) + } + sink <- work + }), + } + // Open a custom listener to extract its local address + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("failed to open notification server: %v", err) + } + defer listener.Close() + + go server.Serve(listener) + + // Create the custom ethash engine + ethash := NewTester([]string{"http://" + listener.Addr().String()}) + defer ethash.Close() + + // Stream a lot of work task and ensure all the notifications bubble out + for i := 0; i < cap(sink); i++ { + header := &types.Header{Number: big.NewInt(int64(i)), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(header) + + ethash.Seal(nil, block, nil) + } + for i := 0; i < cap(sink); i++ { + select { + case <-sink: + case <-time.After(250 * time.Millisecond): + t.Fatalf("notification %d timed out", i) + } + } +} diff --git a/eth/backend.go b/eth/backend.go index 32946a0ab..865534b19 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -124,7 +124,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { chainConfig: chainConfig, eventMux: ctx.EventMux, accountManager: ctx.AccountManager, - engine: CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb), + engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb), shutdownChan: make(chan bool), networkID: config.NetworkId, gasPrice: config.GasPrice, @@ -210,7 +210,7 @@ func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Data } // CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service -func CreateConsensusEngine(ctx *node.ServiceContext, config *ethash.Config, chainConfig *params.ChainConfig, db ethdb.Database) consensus.Engine { +func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, db ethdb.Database) consensus.Engine { // If proof-of-authority is requested, set it up if chainConfig.Clique != nil { return clique.New(chainConfig.Clique, db) @@ -222,7 +222,7 @@ func CreateConsensusEngine(ctx *node.ServiceContext, config *ethash.Config, chai return ethash.NewFaker() case ethash.ModeTest: log.Warn("Ethash used in test mode") - return ethash.NewTester() + return ethash.NewTester(nil) case ethash.ModeShared: log.Warn("Ethash used in shared mode") return ethash.NewShared() @@ -234,7 +234,7 @@ func CreateConsensusEngine(ctx *node.ServiceContext, config *ethash.Config, chai DatasetDir: config.DatasetDir, DatasetsInMem: config.DatasetsInMem, DatasetsOnDisk: config.DatasetsOnDisk, - }) + }, notify) engine.SetThreads(-1) // Disable CPU mining return engine } diff --git a/eth/config.go b/eth/config.go index 426d2bf1e..0c82f2923 100644 --- a/eth/config.go +++ b/eth/config.go @@ -97,6 +97,7 @@ type Config struct { // Mining-related options Etherbase common.Address `toml:",omitempty"` MinerThreads int `toml:",omitempty"` + MinerNotify []string `toml:",omitempty"` ExtraData []byte `toml:",omitempty"` GasPrice *big.Int diff --git a/les/backend.go b/les/backend.go index 952d92cc2..178bc1e0e 100644 --- a/les/backend.go +++ b/les/backend.go @@ -102,7 +102,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { peers: peers, reqDist: newRequestDistributor(peers, quitSync), accountManager: ctx.AccountManager, - engine: eth.CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb), + engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, chainDb), shutdownChan: make(chan bool), networkId: config.NetworkId, bloomRequests: make(chan chan *bloombits.Retrieval),