harmony: Fix last_contact race
This commit is contained in:
parent
adb0fc3259
commit
ca4e2d71b6
@ -3,7 +3,6 @@ package resources
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"regexp"
|
"regexp"
|
||||||
"runtime"
|
"runtime"
|
||||||
@ -49,30 +48,34 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
|
|||||||
}
|
}
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
{ // Learn our owner_id while updating harmony_machines
|
{ // Learn our owner_id while updating harmony_machines
|
||||||
var ownerID []int
|
var ownerID int
|
||||||
err := db.Select(ctx, &ownerID, `SELECT id FROM harmony_machines WHERE host_and_port=$1`, hostnameAndPort)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not read from harmony_machines: %w", err)
|
|
||||||
}
|
|
||||||
gpuram := uint64(lo.Sum(reg.GpuRam))
|
gpuram := uint64(lo.Sum(reg.GpuRam))
|
||||||
if len(ownerID) == 0 {
|
|
||||||
err = db.QueryRow(ctx, `INSERT INTO harmony_machines
|
// Upsert query with last_contact update, fetch the machine ID
|
||||||
(host_and_port, cpu, ram, gpu, gpuram) VALUES
|
// (note this isn't a simple insert .. on conflict because host_and_port isn't unique)
|
||||||
($1,$2,$3,$4,$5) RETURNING id`,
|
err := db.QueryRow(ctx, `
|
||||||
hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu, gpuram).Scan(®.Resources.MachineID)
|
WITH upsert AS (
|
||||||
|
UPDATE harmony_machines
|
||||||
|
SET cpu = $2, ram = $3, gpu = $4, gpuram = $5, last_contact = CURRENT_TIMESTAMP
|
||||||
|
WHERE host_and_port = $1
|
||||||
|
RETURNING id
|
||||||
|
),
|
||||||
|
inserted AS (
|
||||||
|
INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, gpuram, last_contact)
|
||||||
|
SELECT $1, $2, $3, $4, $5, CURRENT_TIMESTAMP
|
||||||
|
WHERE NOT EXISTS (SELECT id FROM upsert)
|
||||||
|
RETURNING id
|
||||||
|
)
|
||||||
|
SELECT id FROM upsert
|
||||||
|
UNION ALL
|
||||||
|
SELECT id FROM inserted;
|
||||||
|
`, hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu, gpuram).Scan(&ownerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
reg.MachineID = ownerID
|
||||||
reg.MachineID = ownerID[0]
|
|
||||||
_, err := db.Exec(ctx, `UPDATE harmony_machines SET
|
|
||||||
cpu=$1, ram=$2, gpu=$3, gpuram=$4 WHERE id=$5`,
|
|
||||||
reg.Cpu, reg.Ram, reg.Gpu, gpuram, reg.Resources.MachineID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cleaned := CleanupMachines(context.Background(), db)
|
cleaned := CleanupMachines(context.Background(), db)
|
||||||
logger.Infow("Cleaned up machines", "count", cleaned)
|
logger.Infow("Cleaned up machines", "count", cleaned)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user