Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
5 changed files with 193 additions and 63 deletions
Showing only changes of commit 72c2c0ae7e - Show all commits

View File

@ -77,13 +77,13 @@ func localConsole(ctx *cli.Context) error {
// Create and start the node based on the CLI flags
prepare(ctx)
stack, backend := makeFullNode(ctx)
startNode(ctx, stack, backend)
startNode(ctx, stack, backend, true)
defer stack.Close()
// Attach to the newly started node and start the JavaScript console
// Attach to the newly started node and create the JavaScript console.
client, err := stack.Attach()
if err != nil {
utils.Fatalf("Failed to attach to the inproc geth: %v", err)
return fmt.Errorf("Failed to attach to the inproc geth: %v", err)
}
config := console.Config{
DataDir: utils.MakeDataDir(ctx),
@ -91,29 +91,34 @@ func localConsole(ctx *cli.Context) error {
Client: client,
Preload: utils.MakeConsolePreloads(ctx),
}
console, err := console.New(config)
if err != nil {
utils.Fatalf("Failed to start the JavaScript console: %v", err)
return fmt.Errorf("Failed to start the JavaScript console: %v", err)
}
defer console.Stop(false)
// If only a short execution was requested, evaluate and return
// If only a short execution was requested, evaluate and return.
if script := ctx.GlobalString(utils.ExecFlag.Name); script != "" {
console.Evaluate(script)
return nil
}
// Otherwise print the welcome screen and enter interactive mode
// Track node shutdown and stop the console when it goes down.
// This happens when SIGTERM is sent to the process.
go func() {
stack.Wait()
console.StopInteractive()
}()
// Print the welcome screen and enter interactive mode.
console.Welcome()
console.Interactive()
return nil
}
// remoteConsole will connect to a remote geth instance, attaching a JavaScript
// console to it.
func remoteConsole(ctx *cli.Context) error {
// Attach to a remotely running geth instance and start the JavaScript console
endpoint := ctx.Args().First()
if endpoint == "" {
path := node.DefaultDataDir()
@ -150,7 +155,6 @@ func remoteConsole(ctx *cli.Context) error {
Client: client,
Preload: utils.MakeConsolePreloads(ctx),
}
console, err := console.New(config)
if err != nil {
utils.Fatalf("Failed to start the JavaScript console: %v", err)
@ -165,7 +169,6 @@ func remoteConsole(ctx *cli.Context) error {
// Otherwise print the welcome screen and enter interactive mode
console.Welcome()
console.Interactive()
return nil
}
@ -189,13 +192,13 @@ func dialRPC(endpoint string) (*rpc.Client, error) {
func ephemeralConsole(ctx *cli.Context) error {
// Create and start the node based on the CLI flags
stack, backend := makeFullNode(ctx)
startNode(ctx, stack, backend)
startNode(ctx, stack, backend, false)
defer stack.Close()
// Attach to the newly started node and start the JavaScript console
client, err := stack.Attach()
if err != nil {
utils.Fatalf("Failed to attach to the inproc geth: %v", err)
return fmt.Errorf("Failed to attach to the inproc geth: %v", err)
}
config := console.Config{
DataDir: utils.MakeDataDir(ctx),
@ -206,22 +209,24 @@ func ephemeralConsole(ctx *cli.Context) error {
console, err := console.New(config)
if err != nil {
utils.Fatalf("Failed to start the JavaScript console: %v", err)
return fmt.Errorf("Failed to start the JavaScript console: %v", err)
}
defer console.Stop(false)
// Evaluate each of the specified JavaScript files
for _, file := range ctx.Args() {
if err = console.Execute(file); err != nil {
utils.Fatalf("Failed to execute %s: %v", file, err)
}
}
// Interrupt the JS interpreter when node is stopped.
go func() {
stack.Wait()
console.Stop(false)
}()
console.Stop(true)
// Evaluate each of the specified JavaScript files.
for _, file := range ctx.Args() {
if err = console.Execute(file); err != nil {
return fmt.Errorf("Failed to execute %s: %v", file, err)
}
}
// The main script is now done, but keep running timers/callbacks.
console.Stop(true)
return nil
}

View File

@ -320,7 +320,7 @@ func geth(ctx *cli.Context) error {
stack, backend := makeFullNode(ctx)
defer stack.Close()
startNode(ctx, stack, backend)
startNode(ctx, stack, backend, false)
stack.Wait()
return nil
}
@ -328,11 +328,11 @@ func geth(ctx *cli.Context) error {
// startNode boots up the system node and all registered protocols, after which
// it unlocks any requested accounts, and starts the RPC/IPC interfaces and the
// miner.
func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend) {
func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend, isConsole bool) {
debug.Memsize.Add("node", stack)
// Start up the node itself
utils.StartNode(ctx, stack)
utils.StartNode(ctx, stack, isConsole)
// Unlock any account specifically requested
unlockAccounts(ctx, stack)

View File

@ -68,7 +68,7 @@ func Fatalf(format string, args ...interface{}) {
os.Exit(1)
}
func StartNode(ctx *cli.Context, stack *node.Node) {
func StartNode(ctx *cli.Context, stack *node.Node, isConsole bool) {
if err := stack.Start(); err != nil {
Fatalf("Error starting protocol stack: %v", err)
}
@ -87,17 +87,33 @@ func StartNode(ctx *cli.Context, stack *node.Node) {
go monitorFreeDiskSpace(sigc, stack.InstanceDir(), uint64(minFreeDiskSpace)*1024*1024)
}
<-sigc
log.Info("Got interrupt, shutting down...")
go stack.Close()
for i := 10; i > 0; i-- {
<-sigc
if i > 1 {
log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
shutdown := func() {
log.Info("Got interrupt, shutting down...")
go stack.Close()
for i := 10; i > 0; i-- {
<-sigc
if i > 1 {
log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
}
}
debug.Exit() // ensure trace and CPU profile data is flushed.
debug.LoudPanic("boom")
}
if isConsole {
// In JS console mode, SIGINT is ignored because it's handled by the console.
// However, SIGTERM still shuts down the node.
for {
sig := <-sigc
if sig == syscall.SIGTERM {
shutdown()
return
}
}
} else {
<-sigc
shutdown()
}
debug.Exit() // ensure trace and CPU profile data is flushed.
debug.LoudPanic("boom")
}()
}

View File

@ -17,6 +17,7 @@
package console
import (
"errors"
"fmt"
"io"
"io/ioutil"
@ -26,6 +27,7 @@ import (
"regexp"
"sort"
"strings"
"sync"
"syscall"
"github.com/dop251/goja"
@ -74,6 +76,13 @@ type Console struct {
histPath string // Absolute path to the console scrollback history
history []string // Scroll history maintained by the console
printer io.Writer // Output writer to serialize any display strings to
interactiveStopped chan struct{}
stopInteractiveCh chan struct{}
signalReceived chan struct{}
stopped chan struct{}
wg sync.WaitGroup
stopOnce sync.Once
}
// New initializes a JavaScript interpreted runtime environment and sets defaults
@ -92,12 +101,16 @@ func New(config Config) (*Console, error) {
// Initialize the console and return
console := &Console{
client: config.Client,
jsre: jsre.New(config.DocRoot, config.Printer),
prompt: config.Prompt,
prompter: config.Prompter,
printer: config.Printer,
histPath: filepath.Join(config.DataDir, HistoryFile),
client: config.Client,
jsre: jsre.New(config.DocRoot, config.Printer),
prompt: config.Prompt,
prompter: config.Prompter,
printer: config.Printer,
histPath: filepath.Join(config.DataDir, HistoryFile),
interactiveStopped: make(chan struct{}),
stopInteractiveCh: make(chan struct{}),
signalReceived: make(chan struct{}, 1),
stopped: make(chan struct{}),
}
if err := os.MkdirAll(config.DataDir, 0700); err != nil {
return nil, err
@ -105,6 +118,10 @@ func New(config Config) (*Console, error) {
if err := console.init(config.Preload); err != nil {
return nil, err
}
console.wg.Add(1)
go console.interruptHandler()
return console, nil
}
@ -337,9 +354,63 @@ func (c *Console) Evaluate(statement string) {
}
}()
c.jsre.Evaluate(statement, c.printer)
// Avoid exiting Interactive when jsre was interrupted by SIGINT.
c.clearSignalReceived()
}
// Interactive starts an interactive user session, where input is propted from
// interruptHandler runs in its own goroutine and waits for signals.
// When a signal is received, it interrupts the JS interpreter.
func (c *Console) interruptHandler() {
defer c.wg.Done()
// During Interactive, liner inhibits the signal while it is prompting for
// input. However, the signal will be received while evaluating JS.
//
// On unsupported terminals, SIGINT can also happen while prompting.
// Unfortunately, it is not possible to abort the prompt in this case and
// the c.readLines goroutine leaks.
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT)
defer signal.Stop(sig)
for {
select {
case <-sig:
c.setSignalReceived()
c.jsre.Interrupt(errors.New("interrupted"))
case <-c.stopInteractiveCh:
close(c.interactiveStopped)
c.jsre.Interrupt(errors.New("interrupted"))
case <-c.stopped:
return
}
}
}
func (c *Console) setSignalReceived() {
select {
case c.signalReceived <- struct{}{}:
default:
}
}
func (c *Console) clearSignalReceived() {
select {
case <-c.signalReceived:
default:
}
}
// StopInteractive causes Interactive to return as soon as possible.
func (c *Console) StopInteractive() {
select {
case c.stopInteractiveCh <- struct{}{}:
case <-c.stopped:
}
}
// Interactive starts an interactive user session, where in.put is propted from
// the configured user prompter.
func (c *Console) Interactive() {
var (
@ -349,15 +420,11 @@ func (c *Console) Interactive() {
inputLine = make(chan string, 1) // receives user input
inputErr = make(chan error, 1) // receives liner errors
requestLine = make(chan string) // requests a line of input
interrupt = make(chan os.Signal, 1)
)
// Monitor Ctrl-C. While liner does turn on the relevant terminal mode bits to avoid
// the signal, a signal can still be received for unsupported terminals. Unfortunately
// there is no way to cancel the line reader when this happens. The readLines
// goroutine will be leaked in this case.
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(interrupt)
defer func() {
c.writeHistory()
}()
// The line reader runs in a separate goroutine.
go c.readLines(inputLine, inputErr, requestLine)
@ -368,7 +435,14 @@ func (c *Console) Interactive() {
requestLine <- prompt
select {
case <-interrupt:
case <-c.interactiveStopped:
fmt.Fprintln(c.printer, "node is down, exiting console")
return
case <-c.signalReceived:
// SIGINT received while prompting for input -> unsupported terminal.
// I'm not sure if the best choice would be to leave the console running here.
// Bash keeps running in this case. node.js does not.
fmt.Fprintln(c.printer, "caught interrupt, exiting")
return
@ -476,12 +550,19 @@ func (c *Console) Execute(path string) error {
// Stop cleans up the console and terminates the runtime environment.
func (c *Console) Stop(graceful bool) error {
if err := ioutil.WriteFile(c.histPath, []byte(strings.Join(c.history, "\n")), 0600); err != nil {
return err
}
if err := os.Chmod(c.histPath, 0600); err != nil { // Force 0600, even if it was different previously
return err
}
c.stopOnce.Do(func() {
// Stop the interrupt handler.
close(c.stopped)
c.wg.Wait()
})
c.jsre.Stop(graceful)
return nil
}
func (c *Console) writeHistory() error {
if err := ioutil.WriteFile(c.histPath, []byte(strings.Join(c.history, "\n")), 0600); err != nil {
return err
}
return os.Chmod(c.histPath, 0600) // Force 0600, even if it was different previously
}

View File

@ -20,6 +20,7 @@ package jsre
import (
crand "crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
@ -220,19 +221,33 @@ loop:
}
// Do executes the given function on the JS event loop.
// When the runtime is stopped, fn will not execute.
func (re *JSRE) Do(fn func(*goja.Runtime)) {
done := make(chan bool)
req := &evalReq{fn, done}
re.evalQueue <- req
<-done
select {
case re.evalQueue <- req:
<-done
case <-re.closed:
}
}
// stops the event loop before exit, optionally waits for all timers to expire
// Stop terminates the event loop, optionally waiting for all timers to expire.
func (re *JSRE) Stop(waitForCallbacks bool) {
select {
case <-re.closed:
case re.stopEventLoop <- waitForCallbacks:
<-re.closed
timeout := time.NewTimer(10 * time.Millisecond)
defer timeout.Stop()
for {
select {
case <-re.closed:
return
case re.stopEventLoop <- waitForCallbacks:
<-re.closed
return
case <-timeout.C:
// JS is blocked, interrupt and try again.
re.vm.Interrupt(errors.New("JS runtime stopped"))
}
}
}
@ -282,6 +297,19 @@ func (re *JSRE) Evaluate(code string, w io.Writer) {
})
}
// Interrupt stops the current JS evaluation.
func (re *JSRE) Interrupt(v interface{}) {
done := make(chan bool)
noop := func(*goja.Runtime) {}
select {
case re.evalQueue <- &evalReq{noop, done}:
// event loop is not blocked.
default:
re.vm.Interrupt(v)
}
}
// Compile compiles and then runs a piece of JS code.
func (re *JSRE) Compile(filename string, src string) (err error) {
re.Do(func(vm *goja.Runtime) { _, err = compileAndRun(vm, filename, src) })