Merge pull request #1657 from filecoin-project/fix/worker-disconnect

workers: Handle disconnecting workers more gracefully
This commit is contained in:
Łukasz Magiera 2020-05-01 23:10:52 +02:00 committed by GitHub
commit 584a4aba4c
11 changed files with 187 additions and 47 deletions

View File

@ -22,4 +22,6 @@ type WorkerApi interface {
storage.Sealer
Fetch(context.Context, abi.SectorID, stores.SectorFileType, bool) error
Closing(context.Context) (<-chan struct{}, error)
}

View File

@ -227,6 +227,8 @@ type WorkerStruct struct {
FinalizeSector func(context.Context, abi.SectorID) error `perm:"admin"`
Fetch func(context.Context, abi.SectorID, stores.SectorFileType, bool) error `perm:"admin"`
Closing func(context.Context) (<-chan struct{}, error) `perm:"admin"`
}
}
@ -816,6 +818,10 @@ func (w *WorkerStruct) Fetch(ctx context.Context, id abi.SectorID, fileType stor
return w.Internal.Fetch(ctx, id, fileType, b)
}
func (w *WorkerStruct) Closing(ctx context.Context) (<-chan struct{}, error) {
return w.Internal.Closing(ctx)
}
var _ api.Common = &CommonStruct{}
var _ api.FullNode = &FullNodeStruct{}
var _ api.StorageMiner = &StorageMinerStruct{}

View File

@ -142,9 +142,9 @@ var infoCmd = &cli.Command{
},
}
type stateMeta struct{
i int
col color.Attribute
type stateMeta struct {
i int
col color.Attribute
state sealing.SectorState
}
@ -178,8 +178,8 @@ var stateList = []stateMeta{
func init() {
for i, state := range stateList {
stateOrder[state.state] = stateMeta{
i: i,
col: state.col,
i: i,
col: state.col,
}
}
}

View File

@ -142,22 +142,21 @@ var storageListCmd = &cli.Command{
return err
}
sorted := make([]struct {
type fsInfo struct {
stores.ID
sectors []stores.Decl
stat stores.FsStat
}, 0, len(st))
stat stores.FsStat
}
sorted := make([]fsInfo, 0, len(st))
for id, decls := range st {
st, err := nodeApi.StorageStat(ctx, id)
if err != nil {
return err
sorted = append(sorted, fsInfo{ID: id, sectors: decls})
continue
}
sorted = append(sorted, struct {
stores.ID
sectors []stores.Decl
stat stores.FsStat
}{id, decls, st})
sorted = append(sorted, fsInfo{id, decls, st})
}
sort.Slice(sorted, func(i, j int) bool {
@ -178,14 +177,17 @@ var storageListCmd = &cli.Command{
}
}
fmt.Printf("%s:\n", s.ID)
pingStart := time.Now()
st, err := nodeApi.StorageStat(ctx, s.ID)
if err != nil {
return err
fmt.Printf("\t%s: %s:\n", color.RedString("Error"), err)
continue
}
ping := time.Now().Sub(pingStart)
usedPercent := (st.Capacity-st.Available)*100/st.Capacity
usedPercent := (st.Capacity - st.Available) * 100 / st.Capacity
percCol := color.FgGreen
switch {
@ -196,10 +198,9 @@ var storageListCmd = &cli.Command{
}
var barCols = uint64(50)
set := (st.Capacity-st.Available)*barCols/st.Capacity
set := (st.Capacity - st.Available) * barCols / st.Capacity
bar := strings.Repeat("|", int(set)) + strings.Repeat(" ", int(barCols-set))
fmt.Printf("%s:\n", s.ID)
fmt.Printf("\t[%s] %s/%s %s\n", color.New(percCol).Sprint(bar),
types.SizeStr(types.NewInt(st.Capacity-st.Available)),
types.SizeStr(types.NewInt(st.Capacity)),

View File

@ -66,37 +66,36 @@ var workersListCmd = &cli.Command{
gpuUse = ""
}
fmt.Printf("Worker %d, host %s\n", stat.id, color.MagentaString(stat.Info.Hostname))
var barCols = uint64(64)
cpuBars := int(stat.CpuUse * barCols / stat.Info.Resources.CPUs)
cpuBar := strings.Repeat("|", cpuBars) + strings.Repeat(" ", int(barCols) - cpuBars)
cpuBar := strings.Repeat("|", cpuBars) + strings.Repeat(" ", int(barCols)-cpuBars)
fmt.Printf("\tCPU: [%s] %d core(s) in use\n", color.GreenString(cpuBar), stat.CpuUse)
ramBarsRes := int(stat.Info.Resources.MemReserved*barCols/stat.Info.Resources.MemPhysical)
ramBarsUsed := int(stat.MemUsedMin*barCols/stat.Info.Resources.MemPhysical)
ramBarsRes := int(stat.Info.Resources.MemReserved * barCols / stat.Info.Resources.MemPhysical)
ramBarsUsed := int(stat.MemUsedMin * barCols / stat.Info.Resources.MemPhysical)
ramBar := color.YellowString(strings.Repeat("|", ramBarsRes)) +
color.GreenString(strings.Repeat("|", ramBarsUsed)) +
strings.Repeat(" ", int(barCols) - ramBarsUsed - ramBarsRes)
strings.Repeat(" ", int(barCols)-ramBarsUsed-ramBarsRes)
vmem := stat.Info.Resources.MemPhysical+stat.Info.Resources.MemSwap
vmem := stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap
vmemBarsRes := int(stat.Info.Resources.MemReserved*barCols/vmem)
vmemBarsUsed := int(stat.MemUsedMax*barCols/vmem)
vmemBarsRes := int(stat.Info.Resources.MemReserved * barCols / vmem)
vmemBarsUsed := int(stat.MemUsedMax * barCols / vmem)
vmemBar := color.YellowString(strings.Repeat("|", vmemBarsRes)) +
color.GreenString(strings.Repeat("|", vmemBarsUsed)) +
strings.Repeat(" ", int(barCols) - vmemBarsUsed - vmemBarsRes)
strings.Repeat(" ", int(barCols)-vmemBarsUsed-vmemBarsRes)
fmt.Printf("\tRAM: [%s] %d%% %s/%s\n", ramBar,
(stat.Info.Resources.MemReserved + stat.MemUsedMin)*100/stat.Info.Resources.MemPhysical,
types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved + stat.MemUsedMin)),
(stat.Info.Resources.MemReserved+stat.MemUsedMin)*100/stat.Info.Resources.MemPhysical,
types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved+stat.MemUsedMin)),
types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)))
fmt.Printf("\tVMEM: [%s] %d%% %s/%s\n", vmemBar,
(stat.Info.Resources.MemReserved + stat.MemUsedMax)*100/vmem,
types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved + stat.MemUsedMax)),
(stat.Info.Resources.MemReserved+stat.MemUsedMax)*100/vmem,
types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved+stat.MemUsedMax)),
types.SizeStr(types.NewInt(vmem)))
for _, gpu := range stat.Info.Resources.GPUs {

2
go.mod
View File

@ -27,7 +27,7 @@ require (
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/sector-storage v0.0.0-20200429155855-7f1c9c89e735
github.com/filecoin-project/sector-storage v0.0.0-20200501181153-e4a9a16161e9
github.com/filecoin-project/specs-actors v0.3.0
github.com/filecoin-project/specs-storage v0.0.0-20200417134612-61b2d91a6102
github.com/filecoin-project/storage-fsm v0.0.0-20200427182014-01487d5ad3c8

4
go.sum
View File

@ -177,8 +177,8 @@ github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
github.com/filecoin-project/lotus v0.2.10/go.mod h1:om5PQA9ZT0lf16qI7Fz/ZGLn4LDCMqPC8ntZA9uncRE=
github.com/filecoin-project/sector-storage v0.0.0-20200411000242-61616264b16d/go.mod h1:/yueJueMh0Yc+0G1adS0lhnedcSnjY86EjKsA20+DVY=
github.com/filecoin-project/sector-storage v0.0.0-20200429155855-7f1c9c89e735 h1:dzhoQvScipNNexk7Jcgspgf+QLun+gT7FYBdDMMnqgE=
github.com/filecoin-project/sector-storage v0.0.0-20200429155855-7f1c9c89e735/go.mod h1:q/V90xaSKTlu7KovS0uj+cAvlPPFrGn141ZO3iQNEdw=
github.com/filecoin-project/sector-storage v0.0.0-20200501181153-e4a9a16161e9 h1:WG6rFyhbhwWNpB0IExJJpXA8ok4Dduwws/Qy9FYTYfc=
github.com/filecoin-project/sector-storage v0.0.0-20200501181153-e4a9a16161e9/go.mod h1:hvyaNnvsjZ4D/5tq78GWZJ39uTStLJcHRp7cWPeVBgY=
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
github.com/filecoin-project/specs-actors v0.0.0-20200409043918-e569f4a2f504/go.mod h1:mdJraXq5vMy0+/FqVQIrnNlpQ/Em6zeu06G/ltQ0/lA=
github.com/filecoin-project/specs-actors v0.2.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=

View File

@ -84,20 +84,25 @@ type client struct {
// NewMergeClient is like NewClient, but allows to specify multiple structs
// to be filled in the same namespace, using one connection
func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) {
var config Config
for _, o := range opts {
o(&config)
}
connFactory := func() (*websocket.Conn, error) {
conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader)
return conn, err
}
if config.proxyConnFactory != nil {
connFactory = config.proxyConnFactory(connFactory)
}
conn, err := connFactory()
if err != nil {
return nil, err
}
var config Config
for _, o := range opts {
o(&config)
}
c := client{
namespace: namespace,
}

View File

@ -1,9 +1,15 @@
package jsonrpc
import "time"
import (
"time"
"github.com/gorilla/websocket"
)
type Config struct {
ReconnectInterval time.Duration
proxyConnFactory func(func() (*websocket.Conn, error)) func() (*websocket.Conn, error) // for testing
}
var defaultConfig = Config{

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http/httptest"
"strconv"
"strings"
@ -11,6 +12,7 @@ import (
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)
@ -273,11 +275,13 @@ func TestUnmarshalableResult(t *testing.T) {
}
type ChanHandler struct {
wait chan struct{}
wait chan struct{}
ctxdone <-chan struct{}
}
func (h *ChanHandler) Sub(ctx context.Context, i int, eq int) (<-chan int, error) {
out := make(chan int)
h.ctxdone = ctx.Done()
go func() {
defer close(out)
@ -377,6 +381,122 @@ func TestChan(t *testing.T) {
require.Equal(t, false, ok)
}
func TestChanServerClose(t *testing.T) {
var client struct {
Sub func(context.Context, int, int) (<-chan int, error)
}
serverHandler := &ChanHandler{
wait: make(chan struct{}, 5),
}
rpcServer := NewServer()
rpcServer.Register("ChanHandler", serverHandler)
tctx, tcancel := context.WithCancel(context.Background())
testServ := httptest.NewServer(rpcServer)
testServ.Config.ConnContext = func(ctx context.Context, c net.Conn) context.Context {
return tctx
}
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
require.NoError(t, err)
defer closer()
serverHandler.wait <- struct{}{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// sub
sub, err := client.Sub(ctx, 2, -1)
require.NoError(t, err)
// recv one
require.Equal(t, 2, <-sub)
// make sure we're blocked
select {
case <-time.After(200 * time.Millisecond):
case <-sub:
t.Fatal("didn't expect to get anything from sub")
}
// close server
tcancel()
testServ.Close()
_, ok := <-sub
require.Equal(t, false, ok)
}
func TestServerChanLockClose(t *testing.T) {
var client struct {
Sub func(context.Context, int, int) (<-chan int, error)
}
serverHandler := &ChanHandler{
wait: make(chan struct{}),
}
rpcServer := NewServer()
rpcServer.Register("ChanHandler", serverHandler)
testServ := httptest.NewServer(rpcServer)
var closeConn func() error
_, err := NewMergeClient("ws://"+testServ.Listener.Addr().String(),
"ChanHandler",
[]interface{}{&client}, nil,
func(c *Config) {
c.proxyConnFactory = func(f func() (*websocket.Conn, error)) func() (*websocket.Conn, error) {
return func() (*websocket.Conn, error) {
c, err := f()
if err != nil {
return nil, err
}
closeConn = c.UnderlyingConn().Close
return c, nil
}
}
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// sub
sub, err := client.Sub(ctx, 2, -1)
require.NoError(t, err)
// recv one
go func() {
serverHandler.wait <- struct{}{}
}()
require.Equal(t, 2, <-sub)
for i := 0; i < 100; i++ {
serverHandler.wait <- struct{}{}
}
if err := closeConn(); err != nil {
t.Fatal(err)
}
<-serverHandler.ctxdone
}
func TestControlChanDeadlock(t *testing.T) {
for r := 0; r < 20; r++ {
testControlChanDeadlock(t)

View File

@ -436,13 +436,14 @@ func (c *wsConn) closeInFlight() {
Code: 2,
},
}
c.handlingLk.Lock()
for _, cancel := range c.handling {
cancel()
}
c.handlingLk.Unlock()
}
c.handlingLk.Lock()
for _, cancel := range c.handling {
cancel()
}
c.handlingLk.Unlock()
c.inflight = map[int64]clientRequest{}
c.handling = map[int64]context.CancelFunc{}
}