setTimeout and sendAsync implemented

added and eval queue for serializing JSRE vm execution
This commit is contained in:
zsfelfoldi 2015-04-22 02:31:59 +02:00
parent 2e9ed6f7aa
commit c54d123b31
4 changed files with 299 additions and 35 deletions

View File

@ -103,6 +103,7 @@ func (js *jsre) apiBindings() {
t, _ := js.re.Get("jeth") t, _ := js.re.Get("jeth")
jethObj := t.Object() jethObj := t.Object()
jethObj.Set("send", jeth.Send) jethObj.Set("send", jeth.Send)
jethObj.Set("sendAsync", jeth.Send)
err := js.re.Compile("bignumber.js", re.BigNumber_JS) err := js.re.Compile("bignumber.js", re.BigNumber_JS)
if err != nil { if err != nil {
@ -172,8 +173,10 @@ func (self *jsre) UnlockAccount(addr []byte) bool {
func (self *jsre) exec(filename string) error { func (self *jsre) exec(filename string) error {
if err := self.re.Exec(filename); err != nil { if err := self.re.Exec(filename); err != nil {
self.re.Stop(false)
return fmt.Errorf("Javascript Error: %v", err) return fmt.Errorf("Javascript Error: %v", err)
} }
self.re.Stop(true)
return nil return nil
} }
@ -201,6 +204,7 @@ func (self *jsre) interactive() {
if self.atexit != nil { if self.atexit != nil {
self.atexit() self.atexit()
} }
self.re.Stop(false)
} }
func (self *jsre) withHistory(op func(*os.File)) { func (self *jsre) withHistory(op func(*os.File)) {

View File

@ -3,10 +3,11 @@ package jsre
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"sync"
"github.com/robertkrimen/otto" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/robertkrimen/otto"
) )
/* /*
@ -20,59 +21,261 @@ It provides some helper functions to
type JSRE struct { type JSRE struct {
assetPath string assetPath string
vm *otto.Otto vm *otto.Otto
evalQueue chan *evalReq
stopEventLoop chan bool
loopWg sync.WaitGroup
} }
// jsTimer is a single timer instance with a callback function
type jsTimer struct {
timer *time.Timer
duration time.Duration
interval bool
call otto.FunctionCall
}
// evalResult is a structure to store the result of any serialized vm execution
type evalResult struct {
result otto.Value
err error
}
// evalReq is a serialized vm execution request put in evalQueue and processed by runEventLoop
type evalReq struct {
fn func(res *evalResult)
done chan bool
res evalResult
}
// runtime must be stopped with Stop() after use and cannot be used after stopping
func New(assetPath string) *JSRE { func New(assetPath string) *JSRE {
re := &JSRE{ re := &JSRE{
assetPath, assetPath: assetPath,
otto.New(), vm: otto.New(),
} }
// load prettyprint func definition // load prettyprint func definition
re.vm.Run(pp_js) re.vm.Run(pp_js)
re.vm.Set("loadScript", re.loadScript) re.vm.Set("loadScript", re.loadScript)
re.evalQueue = make(chan *evalReq)
re.stopEventLoop = make(chan bool)
re.loopWg.Add(1)
go re.runEventLoop()
return re return re
} }
// this function runs a piece of JS code either in a serialized way (when useEQ is true) or instantly, circumventing the evalQueue
func (self *JSRE) run(src interface{}, useEQ bool) (value otto.Value, err error) {
if useEQ {
done := make(chan bool)
req := &evalReq{
fn: func(res *evalResult) {
res.result, res.err = self.vm.Run(src)
},
done: done,
}
self.evalQueue <- req
<-done
return req.res.result, req.res.err
} else {
return self.vm.Run(src)
}
}
/*
This function runs the main event loop from a goroutine that is started
when JSRE is created. Use Stop() before exiting to properly stop it.
The event loop processes vm access requests from the evalQueue in a
serialized way and calls timer callback functions at the appropriate time.
Exported functions always access the vm through the event queue. You can
call the functions of the otto vm directly to circumvent the queue. These
functions should be used if and only if running a routine that was already
called from JS through an RPC call.
*/
func (self *JSRE) runEventLoop() {
registry := map[*jsTimer]*jsTimer{}
ready := make(chan *jsTimer)
newTimer := func(call otto.FunctionCall, interval bool) (*jsTimer, otto.Value) {
delay, _ := call.Argument(1).ToInteger()
if 0 >= delay {
delay = 1
}
timer := &jsTimer{
duration: time.Duration(delay) * time.Millisecond,
call: call,
interval: interval,
}
registry[timer] = timer
timer.timer = time.AfterFunc(timer.duration, func() {
ready <- timer
})
value, err := call.Otto.ToValue(timer)
if err != nil {
panic(err)
}
return timer, value
}
setTimeout := func(call otto.FunctionCall) otto.Value {
_, value := newTimer(call, false)
return value
}
setInterval := func(call otto.FunctionCall) otto.Value {
_, value := newTimer(call, true)
return value
}
clearTimeout := func(call otto.FunctionCall) otto.Value {
timer, _ := call.Argument(0).Export()
if timer, ok := timer.(*jsTimer); ok {
timer.timer.Stop()
delete(registry, timer)
}
return otto.UndefinedValue()
}
var waitForCallbacks bool
loop:
for {
select {
case timer := <-ready:
// execute callback, remove/reschedule the timer
var arguments []interface{}
if len(timer.call.ArgumentList) > 2 {
tmp := timer.call.ArgumentList[2:]
arguments = make([]interface{}, 2+len(tmp))
for i, value := range tmp {
arguments[i+2] = value
}
} else {
arguments = make([]interface{}, 1)
}
arguments[0] = timer.call.ArgumentList[0]
_, err := self.vm.Call(`Function.call.call`, nil, arguments...)
if err != nil {
break loop
}
if timer.interval {
timer.timer.Reset(timer.duration)
} else {
delete(registry, timer)
if waitForCallbacks && (len(registry) == 0) {
break loop
}
}
case evalReq := <-self.evalQueue:
// run the code, send the result back
self.vm.Set("setTimeout", setTimeout)
self.vm.Set("setInterval", setInterval)
self.vm.Set("clearTimeout", clearTimeout)
self.vm.Set("clearInterval", clearTimeout)
evalReq.fn(&evalReq.res)
close(evalReq.done)
if waitForCallbacks && (len(registry) == 0) {
break loop
}
case waitForCallbacks = <-self.stopEventLoop:
if !waitForCallbacks || (len(registry) == 0) {
break loop
}
}
}
for _, timer := range registry {
timer.timer.Stop()
delete(registry, timer)
}
self.loopWg.Done()
}
// stops the event loop before exit, optionally waits for all timers to expire
func (self *JSRE) Stop(waitForCallbacks bool) {
self.stopEventLoop <- waitForCallbacks
self.loopWg.Wait()
}
// Exec(file) loads and runs the contents of a file // Exec(file) loads and runs the contents of a file
// if a relative path is given, the jsre's assetPath is used // if a relative path is given, the jsre's assetPath is used
func (self *JSRE) Exec(file string) error { func (self *JSRE) Exec(file string) error {
return self.exec(common.AbsolutePath(self.assetPath, file)) return self.exec(common.AbsolutePath(self.assetPath, file), true)
} }
func (self *JSRE) exec(path string) error { // circumvents the eval queue, see runEventLoop
func (self *JSRE) execWithoutEQ(file string) error {
return self.exec(common.AbsolutePath(self.assetPath, file), false)
}
func (self *JSRE) exec(path string, useEQ bool) error {
code, err := ioutil.ReadFile(path) code, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
return err return err
} }
_, err = self.vm.Run(code) _, err = self.run(code, useEQ)
return err return err
} }
// assigns value v to a variable in the JS environment
func (self *JSRE) Bind(name string, v interface{}) (err error) { func (self *JSRE) Bind(name string, v interface{}) (err error) {
self.vm.Set(name, v) self.Set(name, v)
return return
} }
// runs a piece of JS code
func (self *JSRE) Run(code string) (otto.Value, error) { func (self *JSRE) Run(code string) (otto.Value, error) {
return self.vm.Run(code) return self.run(code, true)
} }
// returns the value of a variable in the JS environment
func (self *JSRE) Get(ns string) (otto.Value, error) { func (self *JSRE) Get(ns string) (otto.Value, error) {
return self.vm.Get(ns) done := make(chan bool)
req := &evalReq{
fn: func(res *evalResult) {
res.result, res.err = self.vm.Get(ns)
},
done: done,
}
self.evalQueue <- req
<-done
return req.res.result, req.res.err
} }
// assigns value v to a variable in the JS environment
func (self *JSRE) Set(ns string, v interface{}) error { func (self *JSRE) Set(ns string, v interface{}) error {
return self.vm.Set(ns, v) done := make(chan bool)
req := &evalReq{
fn: func(res *evalResult) {
res.err = self.vm.Set(ns, v)
},
done: done,
}
self.evalQueue <- req
<-done
return req.res.err
} }
/*
Executes a JS script from inside the currently executing JS code.
Should only be called from inside an RPC routine.
*/
func (self *JSRE) loadScript(call otto.FunctionCall) otto.Value { func (self *JSRE) loadScript(call otto.FunctionCall) otto.Value {
file, err := call.Argument(0).ToString() file, err := call.Argument(0).ToString()
if err != nil { if err != nil {
return otto.FalseValue() return otto.FalseValue()
} }
if err := self.Exec(file); err != nil { if err := self.execWithoutEQ(file); err != nil { // loadScript is only called from inside js
fmt.Println("err:", err) fmt.Println("err:", err)
return otto.FalseValue() return otto.FalseValue()
} }
@ -80,6 +283,7 @@ func (self *JSRE) loadScript(call otto.FunctionCall) otto.Value {
return otto.TrueValue() return otto.TrueValue()
} }
// uses the "prettyPrint" JS function to format a value
func (self *JSRE) PrettyPrint(v interface{}) (val otto.Value, err error) { func (self *JSRE) PrettyPrint(v interface{}) (val otto.Value, err error) {
var method otto.Value var method otto.Value
v, err = self.vm.ToValue(v) v, err = self.vm.ToValue(v)
@ -93,6 +297,7 @@ func (self *JSRE) PrettyPrint(v interface{}) (val otto.Value, err error) {
return method.Call(method, v) return method.Call(method, v)
} }
// creates an otto value from a go type
func (self *JSRE) ToVal(v interface{}) otto.Value { func (self *JSRE) ToVal(v interface{}) otto.Value {
result, err := self.vm.ToValue(v) result, err := self.vm.ToValue(v)
if err != nil { if err != nil {
@ -102,6 +307,7 @@ func (self *JSRE) ToVal(v interface{}) otto.Value {
return result return result
} }
// evaluates JS function and returns result in a pretty printed string format
func (self *JSRE) Eval(code string) (s string, err error) { func (self *JSRE) Eval(code string) (s string, err error) {
var val otto.Value var val otto.Value
val, err = self.Run(code) val, err = self.Run(code)
@ -115,11 +321,12 @@ func (self *JSRE) Eval(code string) (s string, err error) {
return fmt.Sprintf("%v", val), nil return fmt.Sprintf("%v", val), nil
} }
// compiles and then runs a piece of JS code
func (self *JSRE) Compile(fn string, src interface{}) error { func (self *JSRE) Compile(fn string, src interface{}) error {
script, err := self.vm.Compile(fn, src) script, err := self.vm.Compile(fn, src)
if err != nil { if err != nil {
return err return err
} }
self.vm.Run(script) self.run(script, true)
return nil return nil
} }

View File

@ -5,6 +5,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
"time"
) )
type testNativeObjectBinding struct { type testNativeObjectBinding struct {
@ -43,6 +44,31 @@ func TestExec(t *testing.T) {
if exp != got { if exp != got {
t.Errorf("expected '%v', got '%v'", exp, got) t.Errorf("expected '%v', got '%v'", exp, got)
} }
jsre.Stop(false)
}
func TestNatto(t *testing.T) {
jsre := New("/tmp")
ioutil.WriteFile("/tmp/test.js", []byte(`setTimeout(function(){msg = "testMsg"}, 1);`), os.ModePerm)
err := jsre.Exec("test.js")
if err != nil {
t.Errorf("expected no error, got %v", err)
}
time.Sleep(time.Millisecond * 10)
val, err := jsre.Run("msg")
if err != nil {
t.Errorf("expected no error, got %v", err)
}
if !val.IsString() {
t.Errorf("expected string value, got %v", val)
}
exp := "testMsg"
got, _ := val.ToString()
if exp != got {
t.Errorf("expected '%v', got '%v'", exp, got)
}
jsre.Stop(false)
} }
func TestBind(t *testing.T) { func TestBind(t *testing.T) {
@ -59,6 +85,7 @@ func TestBind(t *testing.T) {
t.Errorf("expected no error, got %v", err) t.Errorf("expected no error, got %v", err)
} }
t.Logf("no: %v", pp) t.Logf("no: %v", pp)
jsre.Stop(false)
} }
func TestLoadScript(t *testing.T) { func TestLoadScript(t *testing.T) {
@ -81,4 +108,5 @@ func TestLoadScript(t *testing.T) {
if exp != got { if exp != got {
t.Errorf("expected '%v', got '%v'", exp, got) t.Errorf("expected '%v', got '%v'", exp, got)
} }
jsre.Stop(false)
} }

View File

@ -2,8 +2,7 @@ package rpc
import ( import (
"encoding/json" "encoding/json"
"fmt"
// "fmt"
"github.com/ethereum/go-ethereum/jsre" "github.com/ethereum/go-ethereum/jsre"
"github.com/robertkrimen/otto" "github.com/robertkrimen/otto"
) )
@ -18,12 +17,12 @@ func NewJeth(ethApi *EthereumApi, toVal func(interface{}) otto.Value, re *jsre.J
return &Jeth{ethApi, toVal, re} return &Jeth{ethApi, toVal, re}
} }
func (self *Jeth) err(code int, msg string, id interface{}) (response otto.Value) { func (self *Jeth) err(call otto.FunctionCall, code int, msg string, id interface{}) (response otto.Value) {
rpcerr := &RpcErrorObject{code, msg} rpcerr := &RpcErrorObject{code, msg}
self.re.Set("ret_jsonrpc", jsonrpcver) call.Otto.Set("ret_jsonrpc", jsonrpcver)
self.re.Set("ret_id", id) call.Otto.Set("ret_id", id)
self.re.Set("ret_error", rpcerr) call.Otto.Set("ret_error", rpcerr)
response, _ = self.re.Run(` response, _ = call.Otto.Run(`
ret_response = { jsonrpc: ret_jsonrpc, id: ret_id, error: ret_error }; ret_response = { jsonrpc: ret_jsonrpc, id: ret_id, error: ret_error };
`) `)
return return
@ -32,27 +31,53 @@ func (self *Jeth) err(code int, msg string, id interface{}) (response otto.Value
func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) { func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) {
reqif, err := call.Argument(0).Export() reqif, err := call.Argument(0).Export()
if err != nil { if err != nil {
return self.err(-32700, err.Error(), nil) return self.err(call, -32700, err.Error(), nil)
} }
jsonreq, err := json.Marshal(reqif) jsonreq, err := json.Marshal(reqif)
var req RpcRequest var reqs []RpcRequest
err = json.Unmarshal(jsonreq, &req) batch := true
err = json.Unmarshal(jsonreq, &reqs)
var respif interface{}
err = self.ethApi.GetRequestReply(&req, &respif)
if err != nil { if err != nil {
fmt.Printf("error: %s\n", err) reqs = make([]RpcRequest, 1)
return self.err(-32603, err.Error(), req.Id) err = json.Unmarshal(jsonreq, &reqs[0])
batch = false
}
call.Otto.Set("response_len", len(reqs))
call.Otto.Run("var ret_response = new Array(response_len);")
for i, req := range reqs {
var respif interface{}
err = self.ethApi.GetRequestReply(&req, &respif)
if err != nil {
return self.err(call, -32603, err.Error(), req.Id)
}
call.Otto.Set("ret_jsonrpc", jsonrpcver)
call.Otto.Set("ret_id", req.Id)
res, _ := json.Marshal(respif)
call.Otto.Set("ret_result", string(res))
call.Otto.Set("response_idx", i)
response, err = call.Otto.Run(`
ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, result: JSON.parse(ret_result) };
`)
}
if !batch {
call.Otto.Run("ret_response = ret_response[0];")
}
if call.Argument(1).IsObject() {
call.Otto.Set("callback", call.Argument(1))
call.Otto.Run(`
if (Object.prototype.toString.call(callback) == '[object Function]') {
callback(null, ret_response);
}
`)
} }
self.re.Set("ret_jsonrpc", jsonrpcver)
self.re.Set("ret_id", req.Id)
res, _ := json.Marshal(respif)
self.re.Set("ret_result", string(res))
response, err = self.re.Run(`
ret_response = { jsonrpc: ret_jsonrpc, id: ret_id, result: JSON.parse(ret_result) };
`)
return return
} }