workers: Basic monitoring tools

This commit is contained in:
Łukasz Magiera 2020-03-23 15:56:22 +01:00
parent 80d0f1f2c1
commit a3ba8eb0d7
11 changed files with 177 additions and 25 deletions

View File

@ -113,6 +113,8 @@ type StorageMiner interface {
// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error
WorkerStats(context.Context) (map[uint64]WorkerStats, error)
stores.SectorIndex
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error

View File

@ -35,3 +35,12 @@ type WorkerInfo struct {
Resources WorkerResources
}
type WorkerStats struct {
Info WorkerInfo
MemUsedMin uint64
MemUsedMax uint64
GpuUsed bool
CpuUse int
}

View File

@ -182,6 +182,8 @@ type StorageMinerStruct struct {
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"`
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
WorkerStats func(context.Context) (map[uint64]api.WorkerStats, error) `perm:"admin"`
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
StorageAttach func(context.Context, stores.StorageInfo, stores.FsStat) error `perm:"admin"`
@ -658,6 +660,10 @@ func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) erro
return c.Internal.WorkerConnect(ctx, url)
}
func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (map[uint64]api.WorkerStats, error) {
return c.Internal.WorkerStats(ctx)
}
func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st stores.FsStat) error {
return c.Internal.StorageAttach(ctx, si, st)
}

View File

@ -30,6 +30,7 @@ func main() {
sectorsCmd,
storageCmd,
setPriceCmd,
workersCmd,
}
jaeger := tracing.SetupJaegerTracing("lotus")
defer func() {

View File

@ -134,9 +134,19 @@ var storageListCmd = &cli.Command{
return err
}
for id, sectors := range st {
sorted := make([]struct{stores.ID; sectors []stores.Decl}, 0, len(st))
for id, decls := range st {
sorted = append(sorted, struct{stores.ID; sectors []stores.Decl}{id, decls})
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].ID < sorted[j].ID
})
for _, s := range sorted {
var cnt [3]int
for _, decl := range sectors {
for _, decl := range s.sectors {
for i := range cnt {
if decl.SectorFileType&(1<<i) != 0 {
cnt[i]++
@ -144,10 +154,10 @@ var storageListCmd = &cli.Command{
}
}
fmt.Printf("%s:\n", id)
fmt.Printf("%s:\n", s.ID)
fmt.Printf("\tUnsealed: %d; Sealed: %d; Caches: %d\n", cnt[0], cnt[1], cnt[2])
si, err := nodeApi.StorageInfo(ctx, id)
si, err := nodeApi.StorageInfo(ctx, s.ID)
if err != nil {
return err
}
@ -166,7 +176,7 @@ var storageListCmd = &cli.Command{
fmt.Println("Use: ReadOnly")
}
if localPath, ok := local[id]; ok {
if localPath, ok := local[s.ID]; ok {
fmt.Printf("\tLocal: %s\n", localPath)
}
for _, l := range si.URLs {

View File

@ -1 +1,81 @@
package main
import (
"fmt"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"gopkg.in/urfave/cli.v2"
"sort"
lcli "github.com/filecoin-project/lotus/cli"
)
var workersCmd = &cli.Command{
Name: "workers",
Usage: "interact with workers",
Subcommands: []*cli.Command{
workersListCmd,
},
}
var workersListCmd = &cli.Command{
Name: "list",
Usage: "list workers",
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
stats, err := nodeApi.WorkerStats(ctx)
if err != nil {
return err
}
st := make([]struct{id uint64; api.WorkerStats}, 0, len(stats))
for id, stat := range stats {
st = append(st, struct{id uint64; api.WorkerStats}{id, stat})
}
sort.Slice(st, func(i, j int) bool {
return st[i].id < st[j].id
})
for _, stat := range st {
gpuUse := "not "
if stat.GpuUsed {
gpuUse = ""
}
fmt.Printf("Worker %d, host %s\n", stat.id, stat.Info.Hostname)
if stat.CpuUse != -1 {
fmt.Printf("\tCPU: %d core(s) in use\n", stat.CpuUse)
} else {
fmt.Printf("\tCPU: all cores in use\n")
}
for _, gpu := range stat.Info.Resources.GPUs {
fmt.Printf("\tGPU: %s, %sused\n", gpu, gpuUse)
}
fmt.Printf("\tMemory: System: Physical %s, Swap %s, Reserved %s (%d%% phys)\n",
types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)),
types.SizeStr(types.NewInt(stat.Info.Resources.MemSwap)),
types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved)),
stat.Info.Resources.MemReserved * 100 / stat.Info.Resources.MemPhysical)
fmt.Printf("\t\tUsed: Physical %s (%d%% phys), Virtual %s (%d%% phys, %d%% virt)\n",
types.SizeStr(types.NewInt(stat.MemUsedMin)),
stat.MemUsedMin * 100 / stat.Info.Resources.MemPhysical,
types.SizeStr(types.NewInt(stat.MemUsedMax)),
stat.MemUsedMax * 100 / stat.Info.Resources.MemPhysical,
stat.MemUsedMax * 100 / (stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap))
}
return nil
},
}

View File

@ -51,11 +51,10 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
sm.StorageMgr.ServeHTTP(w, r)
}
/*
func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) {
stat := sm.SectorBuilder.WorkerStats()
return stat, nil
}*/
func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uint64]api.WorkerStats, error) {
return sm.StorageMgr.WorkerStats(), nil
}
func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) {
return sm.Miner.Address(), nil

23
scripts/miner-mon.sh Executable file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
SESSION=$(cat /proc/sys/kernel/random/uuid)
tmux -2 new-session -d -s $SESSION
tmux new-window -t $SESSION:1 -n 'Storage Miner'
tmux split-window -h
tmux select-pane -t 0
tmux send-keys "watch -n1 './lotus-storage-miner info'" C-m
tmux split-window -v
tmux select-pane -t 1
tmux send-keys "watch -n1 './lotus-storage-miner workers list'" C-m
tmux select-pane -t 2
tmux send-keys "watch -n1 './lotus-storage-miner storage list'" C-m
tmux -2 attach-session -t $SESSION

View File

@ -46,7 +46,7 @@ type SectorManager interface {
storage.Prover
}
type workerID uint64
type WorkerID uint64
type Manager struct {
scfg *sectorbuilder.Config
@ -60,12 +60,12 @@ type Manager struct {
storage.Prover
workersLk sync.Mutex
nextWorker workerID
workers map[workerID]*workerHandle
nextWorker WorkerID
workers map[WorkerID]*workerHandle
newWorkers chan *workerHandle
schedule chan *workerRequest
workerFree chan workerID
workerFree chan WorkerID
closing chan struct{}
schedQueue *list.List // List[*workerRequest]
@ -99,11 +99,11 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
index: si,
nextWorker: 0,
workers: map[workerID]*workerHandle{},
workers: map[WorkerID]*workerHandle{},
newWorkers: make(chan *workerHandle),
schedule: make(chan *workerRequest),
workerFree: make(chan workerID),
workerFree: make(chan WorkerID),
closing: make(chan struct{}),
schedQueue: list.New(),
@ -168,12 +168,12 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, secto
panic("implement me")
}
func (m *Manager) getWorkersByPaths(task sealtasks.TaskType, inPaths []stores.StorageInfo) ([]workerID, map[workerID]stores.StorageInfo) {
func (m *Manager) getWorkersByPaths(task sealtasks.TaskType, inPaths []stores.StorageInfo) ([]WorkerID, map[WorkerID]stores.StorageInfo) {
m.workersLk.Lock()
defer m.workersLk.Unlock()
var workers []workerID
paths := map[workerID]stores.StorageInfo{}
var workers []WorkerID
paths := map[WorkerID]stores.StorageInfo{}
for i, worker := range m.workers {
tt, err := worker.w.TaskTypes(context.TODO())
@ -219,7 +219,7 @@ func (m *Manager) getWorkersByPaths(task sealtasks.TaskType, inPaths []stores.St
return workers, paths
}
func (m *Manager) getWorker(ctx context.Context, taskType sealtasks.TaskType, accept []workerID) (Worker, func(), error) {
func (m *Manager) getWorker(ctx context.Context, taskType sealtasks.TaskType, accept []WorkerID) (Worker, func(), error) {
ret := make(chan workerResponse)
select {
@ -355,7 +355,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
}
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
var candidateWorkers []workerID
var candidateWorkers []WorkerID
m.workersLk.Lock()
for id, worker := range m.workers {

View File

@ -12,7 +12,7 @@ const mib = 1 << 20
type workerRequest struct {
taskType sealtasks.TaskType
accept []workerID // ordered by preference
accept []WorkerID // ordered by preference
ret chan<- workerResponse
cancel <-chan struct{}
@ -71,7 +71,7 @@ func (m *Manager) runSched() {
}
}
func (m *Manager) onWorkerFreed(wid workerID) {
func (m *Manager) onWorkerFreed(wid WorkerID) {
for e := m.schedQueue.Front(); e != nil; e = e.Next() {
req := e.Value.(*workerRequest)
var ok bool
@ -140,7 +140,7 @@ func (m *Manager) maybeSchedRequest(req *workerRequest) (*workerResponse, error)
return nil, nil // put in waiting queue
}
func (m *Manager) makeResponse(wid workerID, w *workerHandle, req *workerRequest) *workerResponse {
func (m *Manager) makeResponse(wid WorkerID, w *workerHandle, req *workerRequest) *workerResponse {
needRes := ResourceTable[req.taskType][m.scfg.SealProofType]
w.gpuUsed = needRes.CanGPU
@ -186,7 +186,7 @@ func (m *Manager) makeResponse(wid workerID, w *workerHandle, req *workerRequest
}
}
func (m *Manager) canHandleRequest(wid workerID, w *workerHandle, req *workerRequest) (bool, error) {
func (m *Manager) canHandleRequest(wid WorkerID, w *workerHandle, req *workerRequest) (bool, error) {
needRes, ok := ResourceTable[req.taskType][m.scfg.SealProofType]
if !ok {
return false, xerrors.Errorf("canHandleRequest: missing ResourceTable entry for %s/%d", req.taskType, m.scfg.SealProofType)

View File

@ -0,0 +1,22 @@
package sectorstorage
import "github.com/filecoin-project/lotus/api"
func (m *Manager) WorkerStats() map[uint64]api.WorkerStats {
m.workersLk.Lock()
defer m.workersLk.Unlock()
out := map[uint64]api.WorkerStats{}
for id, handle := range m.workers {
out[uint64(id)] = api.WorkerStats{
Info: handle.info,
MemUsedMin: handle.memUsedMin,
MemUsedMax: handle.memUsedMax,
GpuUsed: handle.gpuUsed,
CpuUse: handle.cpuUse,
}
}
return out
}