storage: Track worker hostnames with work

This commit is contained in:
Łukasz Magiera 2020-11-09 23:09:04 +01:00
parent 95d9084899
commit 27a9dd3bbb
6 changed files with 121 additions and 12 deletions

View File

@ -198,11 +198,16 @@ var sealingJobsCmd = &cli.Command{
dur = time.Now().Sub(l.Start).Truncate(time.Millisecond * 100).String() dur = time.Now().Sub(l.Start).Truncate(time.Millisecond * 100).String()
} }
hostname, ok := workerHostnames[l.wid]
if !ok {
hostname = l.Hostname
}
_, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n", _, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n",
hex.EncodeToString(l.ID.ID[10:]), hex.EncodeToString(l.ID.ID[10:]),
l.Sector.Number, l.Sector.Number,
hex.EncodeToString(l.wid[5:]), hex.EncodeToString(l.wid[5:]),
workerHostnames[l.wid], hostname,
l.Task.Short(), l.Task.Short(),
state, state,
dur) dur)

View File

@ -199,7 +199,7 @@ func (t *WorkState) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull) _, err := w.Write(cbg.CborNull)
return err return err
} }
if _, err := w.Write([]byte{164}); err != nil { if _, err := w.Write([]byte{166}); err != nil {
return err return err
} }
@ -282,6 +282,51 @@ func (t *WorkState) MarshalCBOR(w io.Writer) error {
if _, err := io.WriteString(w, string(t.WorkError)); err != nil { if _, err := io.WriteString(w, string(t.WorkError)); err != nil {
return err return err
} }
// t.WorkerHostname (string) (string)
if len("WorkerHostname") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"WorkerHostname\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("WorkerHostname"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("WorkerHostname")); err != nil {
return err
}
if len(t.WorkerHostname) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.WorkerHostname was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.WorkerHostname))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.WorkerHostname)); err != nil {
return err
}
// t.StartTime (int64) (int64)
if len("StartTime") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"StartTime\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("StartTime"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("StartTime")); err != nil {
return err
}
if t.StartTime >= 0 {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.StartTime)); err != nil {
return err
}
} else {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.StartTime-1)); err != nil {
return err
}
}
return nil return nil
} }
@ -360,6 +405,43 @@ func (t *WorkState) UnmarshalCBOR(r io.Reader) error {
t.WorkError = string(sval) t.WorkError = string(sval)
} }
// t.WorkerHostname (string) (string)
case "WorkerHostname":
{
sval, err := cbg.ReadStringBuf(br, scratch)
if err != nil {
return err
}
t.WorkerHostname = string(sval)
}
// t.StartTime (int64) (int64)
case "StartTime":
{
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
var extraI int64
if err != nil {
return err
}
switch maj {
case cbg.MajUnsignedInt:
extraI = int64(extra)
if extraI < 0 {
return fmt.Errorf("int64 positive overflow")
}
case cbg.MajNegativeInt:
extraI = int64(extra)
if extraI < 0 {
return fmt.Errorf("int64 negative oveflow")
}
extraI = -1 - extraI
default:
return fmt.Errorf("wrong type for int64 field: %d", maj)
}
t.StartTime = int64(extraI)
}
default: default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name) return fmt.Errorf("unknown struct field %d: '%s'", i, name)

View File

@ -383,7 +383,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
selector := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathSealing) selector := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathSealing)
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, wk)(w.SealPreCommit1(ctx, sector, ticket, pieces)) err := m.startWork(ctx, w, wk)(w.SealPreCommit1(ctx, sector, ticket, pieces))
if err != nil { if err != nil {
return err return err
} }
@ -430,7 +430,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true) selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true)
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, wk)(w.SealPreCommit2(ctx, sector, phase1Out)) err := m.startWork(ctx, w, wk)(w.SealPreCommit2(ctx, sector, phase1Out))
if err != nil { if err != nil {
return err return err
} }
@ -480,7 +480,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false) selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) err := m.startWork(ctx, w, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
if err != nil { if err != nil {
return err return err
} }
@ -520,7 +520,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
selector := newTaskSelector() selector := newTaskSelector()
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, wk)(w.SealCommit2(ctx, sector, phase1Out)) err := m.startWork(ctx, w, wk)(w.SealCommit2(ctx, sector, phase1Out))
if err != nil { if err != nil {
return err return err
} }

View File

@ -8,6 +8,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -41,6 +42,9 @@ type WorkState struct {
WorkerCall storiface.CallID // Set when entering wsRunning WorkerCall storiface.CallID // Set when entering wsRunning
WorkError string // Status = wsDone, set when failed to start work WorkError string // Status = wsDone, set when failed to start work
WorkerHostname string // hostname of last worker handling this job
StartTime int64 // unix seconds
} }
func newWorkID(method sealtasks.TaskType, params ...interface{}) (WorkID, error) { func newWorkID(method sealtasks.TaskType, params ...interface{}) (WorkID, error) {
@ -167,8 +171,16 @@ func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params
}, nil }, nil
} }
func (m *Manager) startWork(ctx context.Context, wk WorkID) func(callID storiface.CallID, err error) error { func (m *Manager) startWork(ctx context.Context, w Worker, wk WorkID) func(callID storiface.CallID, err error) error {
return func(callID storiface.CallID, err error) error { return func(callID storiface.CallID, err error) error {
var hostname string
info, ierr := w.Info(ctx)
if ierr != nil {
hostname = "[err]"
} else {
hostname = info.Hostname
}
m.workLk.Lock() m.workLk.Lock()
defer m.workLk.Unlock() defer m.workLk.Unlock()
@ -194,6 +206,8 @@ func (m *Manager) startWork(ctx context.Context, wk WorkID) func(callID storifac
ws.Status = wsRunning ws.Status = wsRunning
} }
ws.WorkerCall = callID ws.WorkerCall = callID
ws.WorkerHostname = hostname
ws.StartTime = time.Now().Unix()
return nil return nil
}) })
if err != nil { if err != nil {

View File

@ -67,12 +67,18 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
continue continue
} }
var ws WorkState
if err := m.work.Get(work).Get(&ws); err != nil {
log.Errorf("WorkerJobs: get work %s: %+v", work, err)
}
out[uuid.UUID{}] = append(out[uuid.UUID{}], storiface.WorkerJob{ out[uuid.UUID{}] = append(out[uuid.UUID{}], storiface.WorkerJob{
ID: id, ID: id,
Sector: id.Sector, Sector: id.Sector,
Task: work.Method, Task: work.Method,
RunWait: -1, RunWait: -1,
Start: time.Time{}, Start: time.Unix(ws.StartTime, 0),
Hostname: ws.WorkerHostname,
}) })
} }

View File

@ -48,6 +48,8 @@ type WorkerJob struct {
RunWait int // -1 - ret-wait, 0 - running, 1+ - assigned RunWait int // -1 - ret-wait, 0 - running, 1+ - assigned
Start time.Time Start time.Time
Hostname string `json:",omitempty"` // optional, set for ret-wait jobs
} }
type CallID struct { type CallID struct {