lotus/lotuspond/outmux.go

131 lines
2.3 KiB
Go
Raw Normal View History

2019-08-19 20:16:27 +00:00
package main
import (
2019-09-16 16:51:14 +00:00
"bufio"
2019-08-19 20:16:27 +00:00
"fmt"
"github.com/gorilla/websocket"
"github.com/opentracing/opentracing-go/log"
"io"
"net/http"
"strings"
"sync"
)
type outmux struct {
lk sync.Mutex
errpw *io.PipeWriter
outpw *io.PipeWriter
errpr *io.PipeReader
outpr *io.PipeReader
2019-08-19 22:38:32 +00:00
n uint64
2019-08-19 20:16:27 +00:00
outs map[uint64]*websocket.Conn
2019-08-19 22:38:32 +00:00
new chan *websocket.Conn
2019-08-19 20:16:27 +00:00
stop chan struct{}
}
func newWsMux() *outmux {
out := &outmux{
2019-08-19 22:38:32 +00:00
n: 0,
2019-08-19 20:16:27 +00:00
outs: map[uint64]*websocket.Conn{},
2019-08-19 22:38:32 +00:00
new: make(chan *websocket.Conn),
stop: make(chan struct{}),
2019-08-19 20:16:27 +00:00
}
out.outpr, out.outpw = io.Pipe()
out.errpr, out.errpw = io.Pipe()
go out.run()
return out
}
func (m *outmux) msgsToChan(r *io.PipeReader, ch chan []byte) {
defer close(ch)
2019-09-16 16:51:14 +00:00
br := bufio.NewReader(r)
2019-08-19 20:16:27 +00:00
for {
2019-09-16 16:51:14 +00:00
buf, _, err := br.ReadLine()
2019-08-19 20:16:27 +00:00
if err != nil {
return
}
2019-09-16 16:51:14 +00:00
out := make([]byte, len(buf) + 1)
copy(out, buf)
out[len(out) - 1] = '\n'
2019-08-19 20:16:27 +00:00
select {
2019-09-16 16:51:14 +00:00
case ch <- out:
2019-08-19 20:16:27 +00:00
case <-m.stop:
return
}
}
}
func (m *outmux) run() {
stdout := make(chan []byte)
stderr := make(chan []byte)
go m.msgsToChan(m.outpr, stdout)
go m.msgsToChan(m.errpr, stderr)
for {
select {
case msg := <-stdout:
for k, out := range m.outs {
if err := out.WriteMessage(websocket.BinaryMessage, msg); err != nil {
out.Close()
fmt.Printf("outmux write failed: %s\n", err)
delete(m.outs, k)
}
}
case msg := <-stderr:
for k, out := range m.outs {
if err := out.WriteMessage(websocket.BinaryMessage, msg); err != nil {
out.Close()
fmt.Printf("outmux write failed: %s\n", err)
delete(m.outs, k)
}
}
case c := <-m.new:
m.n++
m.outs[m.n] = c
case <-m.stop:
for _, out := range m.outs {
out.Close()
}
return
}
}
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (m *outmux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.Contains(r.Header.Get("Connection"), "Upgrade") {
fmt.Println("noupgrade")
w.WriteHeader(500)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
if r.Header.Get("Sec-WebSocket-Protocol") != "" {
w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol"))
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
m.new <- c
return
2019-08-19 22:38:32 +00:00
}