lotus/lotuspond/outmux.go
2019-12-04 21:14:19 -08:00

128 lines
2.3 KiB
Go

package main
import (
"bufio"
"fmt"
"io"
"net/http"
"strings"
"github.com/gorilla/websocket"
"github.com/opentracing/opentracing-go/log"
)
type outmux struct {
errpw *io.PipeWriter
outpw *io.PipeWriter
errpr *io.PipeReader
outpr *io.PipeReader
n uint64
outs map[uint64]*websocket.Conn
new chan *websocket.Conn
stop chan struct{}
}
func newWsMux() *outmux {
out := &outmux{
n: 0,
outs: map[uint64]*websocket.Conn{},
new: make(chan *websocket.Conn),
stop: make(chan struct{}),
}
out.outpr, out.outpw = io.Pipe()
out.errpr, out.errpw = io.Pipe()
go out.run()
return out
}
func (m *outmux) msgsToChan(r *io.PipeReader, ch chan []byte) {
defer close(ch)
br := bufio.NewReader(r)
for {
buf, _, err := br.ReadLine()
if err != nil {
return
}
out := make([]byte, len(buf)+1)
copy(out, buf)
out[len(out)-1] = '\n'
select {
case ch <- out:
case <-m.stop:
return
}
}
}
func (m *outmux) run() {
stdout := make(chan []byte)
stderr := make(chan []byte)
go m.msgsToChan(m.outpr, stdout)
go m.msgsToChan(m.errpr, stderr)
for {
select {
case msg := <-stdout:
for k, out := range m.outs {
if err := out.WriteMessage(websocket.BinaryMessage, msg); err != nil {
out.Close()
fmt.Printf("outmux write failed: %s\n", err)
delete(m.outs, k)
}
}
case msg := <-stderr:
for k, out := range m.outs {
if err := out.WriteMessage(websocket.BinaryMessage, msg); err != nil {
out.Close()
fmt.Printf("outmux write failed: %s\n", err)
delete(m.outs, k)
}
}
case c := <-m.new:
m.n++
m.outs[m.n] = c
case <-m.stop:
for _, out := range m.outs {
out.Close()
}
return
}
}
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (m *outmux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.Contains(r.Header.Get("Connection"), "Upgrade") {
fmt.Println("noupgrade")
w.WriteHeader(500)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
if r.Header.Get("Sec-WebSocket-Protocol") != "" {
w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol"))
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
m.new <- c
}