pond: logs endpoint

This commit is contained in:
Łukasz Magiera 2019-08-19 22:16:27 +02:00
parent d2f6105649
commit eb97be8df4
3 changed files with 157 additions and 5 deletions

View File

@ -65,9 +65,11 @@ func (api *api) Spawn() (nodeInfo, error) {
return nodeInfo{}, err
}
mux := newWsMux()
cmd := exec.Command("./lotus", "daemon", genParam, "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = []string{"LOTUS_PATH=" + dir}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
@ -84,7 +86,9 @@ func (api *api) Spawn() (nodeInfo, error) {
cmd: cmd,
meta: info,
mux: mux,
stop: func() {
defer close(mux.stop)
defer errlogfile.Close()
defer logfile.Close()
},
@ -156,9 +160,11 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) {
time.Sleep(time.Millisecond * 300)
mux := newWsMux()
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
@ -178,7 +184,9 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) {
cmd: cmd,
meta: info,
mux: mux,
stop: func() {
defer close(mux.stop)
defer errlogfile.Close()
defer logfile.Close()
},

View File

@ -5,6 +5,7 @@ import (
"net/http"
"os"
"os/exec"
"path"
"strconv"
"github.com/filecoin-project/go-lotus/lib/jsonrpc"
@ -18,6 +19,7 @@ type runningNode struct {
cmd *exec.Cmd
meta nodeInfo
mux *outmux
stop func()
}
@ -107,15 +109,33 @@ func nodeById(nodes []nodeInfo, i int) nodeInfo {
panic("no node with this id")
}
func logHandler(api *api) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
id, err := strconv.ParseInt(path.Base(req.URL.Path), 10, 32)
if err != nil {
panic(err)
return
}
api.runningLk.Lock()
n := api.running[int32(id)]
api.runningLk.Unlock()
n.mux.ServeHTTP(w, req)
}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "run lotuspond daemon",
Action: func(cctx *cli.Context) error {
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Pond", &api{running: map[int32]runningNode{}})
a := &api{running: map[int32]runningNode{}}
rpcServer.Register("Pond", a)
http.Handle("/", http.FileServer(http.Dir("lotuspond/front/build")))
http.Handle("/rpc/v0", rpcServer)
http.HandleFunc("/logs/", logHandler(a))
fmt.Printf("Listening on http://%s\n", listenAddr)
return http.ListenAndServe(listenAddr, nil)

124
lotuspond/outmux.go Normal file
View File

@ -0,0 +1,124 @@
package main
import (
"fmt"
"github.com/gorilla/websocket"
"github.com/opentracing/opentracing-go/log"
"io"
"net/http"
"strings"
"sync"
)
type outmux struct {
lk sync.Mutex
errpw *io.PipeWriter
outpw *io.PipeWriter
errpr *io.PipeReader
outpr *io.PipeReader
n uint64
outs map[uint64]*websocket.Conn
new chan *websocket.Conn
stop chan struct{}
}
func newWsMux() *outmux {
out := &outmux{
n: 0,
outs: map[uint64]*websocket.Conn{},
new: make(chan *websocket.Conn),
stop: make(chan struct{}),
}
out.outpr, out.outpw = io.Pipe()
out.errpr, out.errpw = io.Pipe()
go out.run()
return out
}
func (m *outmux) msgsToChan(r *io.PipeReader, ch chan []byte) {
defer close(ch)
for {
buf := make([]byte, 1)
n, err := r.Read(buf)
if err != nil {
return
}
select {
case ch <- buf[:n]:
case <-m.stop:
return
}
}
}
func (m *outmux) run() {
stdout := make(chan []byte)
stderr := make(chan []byte)
go m.msgsToChan(m.outpr, stdout)
go m.msgsToChan(m.errpr, stderr)
for {
select {
case msg := <-stdout:
for k, out := range m.outs {
if err := out.WriteMessage(websocket.BinaryMessage, msg); err != nil {
out.Close()
fmt.Printf("outmux write failed: %s\n", err)
delete(m.outs, k)
}
}
case msg := <-stderr:
for k, out := range m.outs {
if err := out.WriteMessage(websocket.BinaryMessage, msg); err != nil {
out.Close()
fmt.Printf("outmux write failed: %s\n", err)
delete(m.outs, k)
}
}
case c := <-m.new:
m.n++
m.outs[m.n] = c
case <-m.stop:
for _, out := range m.outs {
out.Close()
}
return
}
}
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (m *outmux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.Contains(r.Header.Get("Connection"), "Upgrade") {
fmt.Println("noupgrade")
w.WriteHeader(500)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
if r.Header.Get("Sec-WebSocket-Protocol") != "" {
w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol"))
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
m.new <- c
return
}