lotus/lib/harmony/resources/resources.go
Andrew Jackson (Ajax) 81ba6ab6f0
feat: Curio - Easy Migration (#11617)
* feat: lp mig - first few steps

* lp mig: default tasks

* code comments

* docs

* lp-mig-progress

* shared

* comments and todos

* fix: curio: rename lotus-provider to curio (#11645)

* rename provider to curio

* install gotext

* fix lint errors, mod tidy

* fix typo

* fix API_INFO and add gotext to circleCI

* add back gotext

* add gotext after remerge

* lp: channels doc

* finish easy-migration TODOs

* out generate

* merging and more renames

* avoid make-all

* minor doc stuff

* cu: make gen

* make gen fix

* make gen

* tryfix

* go mod tidy

* minor ez migration fixes

* ez setup - ui cleanups

* better error message

* guided setup colors

* better path to saveconfigtolayer

* loadconfigwithupgrades fix

* readMiner oops

* guided - homedir

* err if miner is running

* prompt error should exit

* process already running, miner_id sectors in migration

* dont prompt for language a second time

* check miner stopped

* unlock repo

* render and sql oops

* curio easyMig - some fixes

* easyMigration runs successfully

* lint

* review fixes

* fix backup path

* fixes1

* fixes2

* fixes 3

---------

Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com>
Co-authored-by: LexLuthr <lexluthr@protocol.ai>
2024-03-15 16:38:13 -05:00

166 lines
3.7 KiB
Go

package resources
import (
"bytes"
"context"
"os/exec"
"regexp"
"runtime"
"sync/atomic"
"time"
"github.com/elastic/go-sysinfo"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/sys/unix"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
)
var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats
type Resources struct {
Cpu int
Gpu float64
Ram uint64
MachineID int
Storage
}
// Optional Storage management.
type Storage interface {
HasCapacity() bool
// This allows some other system to claim space for this task.
Claim(taskID int) error
// This allows some other system to consider the task done.
// It's up to the caller to remove the data, if that applies.
MarkComplete(taskID int) error
}
type Reg struct {
Resources
shutdown atomic.Bool
}
var logger = logging.Logger("harmonytask")
var lotusRE = regexp.MustCompile("lotus-worker|lotus-harmony|yugabyted|yb-master|yb-tserver")
func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
var reg Reg
var err error
reg.Resources, err = getResources()
if err != nil {
return nil, err
}
ctx := context.Background()
{ // Learn our owner_id while updating harmony_machines
var ownerID *int
// Upsert query with last_contact update, fetch the machine ID
// (note this isn't a simple insert .. on conflict because host_and_port isn't unique)
err := db.QueryRow(ctx, `
WITH upsert AS (
UPDATE harmony_machines
SET cpu = $2, ram = $3, gpu = $4, last_contact = CURRENT_TIMESTAMP
WHERE host_and_port = $1
RETURNING id
),
inserted AS (
INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, last_contact)
SELECT $1, $2, $3, $4, CURRENT_TIMESTAMP
WHERE NOT EXISTS (SELECT id FROM upsert)
RETURNING id
)
SELECT id FROM upsert
UNION ALL
SELECT id FROM inserted;
`, hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu).Scan(&ownerID)
if err != nil {
return nil, xerrors.Errorf("inserting machine entry: %w", err)
}
if ownerID == nil {
return nil, xerrors.Errorf("no owner id")
}
reg.MachineID = *ownerID
cleaned := CleanupMachines(context.Background(), db)
logger.Infow("Cleaned up machines", "count", cleaned)
}
go func() {
for {
time.Sleep(time.Minute)
if reg.shutdown.Load() {
return
}
_, err := db.Exec(ctx, `UPDATE harmony_machines SET last_contact=CURRENT_TIMESTAMP where id=$1`, reg.MachineID)
if err != nil {
logger.Error("Cannot keepalive ", err)
}
}
}()
return &reg, nil
}
func CleanupMachines(ctx context.Context, db *harmonydb.DB) int {
ct, err := db.Exec(ctx,
`DELETE FROM harmony_machines WHERE last_contact < CURRENT_TIMESTAMP - INTERVAL '1 MILLISECOND' * $1 `,
LOOKS_DEAD_TIMEOUT.Milliseconds()) // ms enables unit testing to change timeout.
if err != nil {
logger.Warn("unable to delete old machines: ", err)
}
return ct
}
func (res *Reg) Shutdown() {
res.shutdown.Store(true)
}
func getResources() (res Resources, err error) {
b, err := exec.Command(`ps`, `-ef`).CombinedOutput()
if err != nil {
logger.Warn("Could not safety check for 2+ processes: ", err)
} else {
found := 0
for _, b := range bytes.Split(b, []byte("\n")) {
if lotusRE.Match(b) {
found++
}
}
if found > 1 {
logger.Warn("curio's defaults are for running alone. Use task maximums or CGroups.")
}
}
h, err := sysinfo.Host()
if err != nil {
return Resources{}, err
}
mem, err := h.Memory()
if err != nil {
return Resources{}, err
}
res = Resources{
Cpu: runtime.NumCPU(),
Ram: mem.Available,
Gpu: getGPUDevices(),
}
return res, nil
}
func DiskFree(path string) (uint64, error) {
s := unix.Statfs_t{}
err := unix.Statfs(path, &s)
if err != nil {
return 0, err
}
return s.Bfree * uint64(s.Bsize), nil
}