@ -156,7 +156,7 @@ require (
golang.org/x/sys v0.9.0
@ -167,6 +167,8 @@ require (
require (
require (
github.com/Inkeliz/go-opencl v0.0.0-20200806180703-5f0707fba006 // indirect
github.com/Nv7-Github/go-cl v0.0.0-20210426150049-f121093b60ef // indirect
@ -177,8 +179,10 @@ require (
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
@ -202,7 +206,10 @@ require (
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.9.1 // indirect
@ -211,7 +218,11 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
@ -256,10 +267,12 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
@ -280,6 +293,8 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
@ -291,6 +306,7 @@ require (
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
@ -306,12 +322,15 @@ require (
github.com/samber/lo v1.38.1 // indirect
github.com/samuel/go-opencl v0.0.0-20171108220231-cbcfd10c32ad // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.6 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
@ -327,6 +346,7 @@ require (
golang.org/x/arch v0.3.0 // indirect
@ -59,6 +59,8 @@ github.com/GeertJohan/go.rice v1.0.3 h1:k5viR+xGtIhF61125vCE1cmJ5957RQGXG6dmbaWZ
github.com/Inkeliz/go-opencl v0.0.0-20200806180703-5f0707fba006 h1:TKWkFaRW5EPQyrS1pM0vm3vvqw/jmHu+FkV8gRD+7/w=
github.com/Inkeliz/go-opencl v0.0.0-20200806180703-5f0707fba006/go.mod h1:9ILtD1/UTP/Y7JMCU8loWZMDvhrQuTgHzHatG6z9ZdQ=
@ -66,6 +68,8 @@ github.com/Kubuxu/imtui v0.0.0-20210401140320-41663d68d0fa/go.mod h1:WUmMvh9wMtq
github.com/Nv7-Github/go-cl v0.0.0-20210426150049-f121093b60ef h1:DiNnYI6NBdeXGOJXptJcrYeDavJf4tImz/B4MOVQtMs=
github.com/Nv7-Github/go-cl v0.0.0-20210426150049-f121093b60ef/go.mod h1:RRVtxaQlBBnbo+n2fgYHhxQmXDkRLKWcWX93lJL0Yhw=
@ -142,6 +146,9 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
@ -152,6 +159,9 @@ github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
@ -399,6 +411,8 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
@ -445,10 +459,16 @@ github.com/go-openapi/swag v0.19.11/go.mod h1:Uc0gKkdR+ojzsEpjh39QChyu92vPgIr72P
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js=
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
@ -970,6 +992,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/samuel/go-opencl v0.0.0-20171108220231-cbcfd10c32ad h1:zyvTnsJPPAqVg2v3bbvTI+RdbVPJufZ+CWCPOX0Dtp8=
github.com/samuel/go-opencl v0.0.0-20171108220231-cbcfd10c32ad/go.mod h1:KCqoxhWgoxCWg13iOq53YFf50jlonuuhIpO916aWEkg=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
@ -1628,6 +1663,8 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
@ -1789,6 +1826,9 @@ go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
@ -2066,6 +2106,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -2319,6 +2361,7 @@ lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1
@ -0,0 +1,247 @@
package itests
import (
type task1 struct {
toAdd []int
myPersonalTableLock sync.Mutex
myPersonalTable map[harmonytask.TaskID]int // This would typicallyb be a DB table
WorkCompleted []string
func (t *task1) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
if !stillOwned() {
return false, errors.New("Why not still owned?")
defer t.myPersonalTableLock.Unlock()
t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[tID]))
return true, nil
func (t *task1) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) {
return &list[0], nil
func (t *task1) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 100,
Name: "ThingOne",
MaxFailures: 1,
Cost: resources.Resources{
Cpu: 1,
Ram: 100 << 10, // at 100kb, it's tiny
func (t *task1) Adder(add harmonytask.AddTaskFunc) {
for _, v := range t.toAdd {
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool {
defer t.myPersonalTableLock.Unlock()
t.myPersonalTable[tID] = v
return true
func TestHarmonyTasks(t *testing.T) {
withSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
t1 := &task1{
toAdd: []int{56, 73},
myPersonalTable: map[harmonytask.TaskID]int{},
e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1")
require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
require.Equal(t, t1.WorkCompleted, 2, "wrong amount of work complete: expected 2 got:")
got := strings.Join(t1.WorkCompleted, ",")
expected := "taskResult56,taskResult73"
if got != expected {
t.Fatal("Unexpected results! Wanted " + expected + " got " + got)
// TODO test history table looks right.
type passthru struct {
dtl harmonytask.TaskTypeDetails
do func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error)
canAccept func(list []harmonytask.TaskID) (*harmonytask.TaskID, error)
adder func(add harmonytask.AddTaskFunc)
func (t *passthru) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
return t.do(tID, stillOwned)
func (t *passthru) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) {
return t.canAccept(list)
func (t *passthru) TypeDetails() harmonytask.TaskTypeDetails {
return t.dtl
func (t *passthru) Adder(add harmonytask.AddTaskFunc) {
if t.adder != nil {
// Common stuff
var dtl = harmonytask.TaskTypeDetails{Name: "foo", Max: -1, Cost: resources.Resources{}}
var letters []string
var lettersMutex sync.Mutex
func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru {
return &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return nil, nil },
adder: func(add harmonytask.AddTaskFunc) {
for _, v := range []string{"A", "B"} {
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool {
_, err := tx.Exec("INSERT INTO itest_scratch (some_int, content) VALUES ($1,$2)", tID, v)
require.NoError(t, err)
return true
func fooLetterSaver(t *testing.T, cdb *harmonydb.DB) *passthru {
return &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil },
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var content string
err = cdb.QueryRow(context.Background(),
"SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content)
require.NoError(t, err)
defer lettersMutex.Unlock()
letters = append(letters, content)
return true, nil
func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
withSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb)
workerParty := fooLetterSaver(t, cdb)
harmonytask.POLL_DURATION = time.Millisecond * 100
sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1")
require.NoError(t, err)
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2")
require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
sender.GracefullyTerminate(time.Second * 5)
worker.GracefullyTerminate(time.Second * 5)
require.Equal(t, letters, []string{"A", "B"})
func TestWorkStealing(t *testing.T) {
withSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
ctx := context.Background()
// The dead worker will be played by a few SQL INSERTS.
_, err := cdb.Exec(ctx, `INSERT INTO harmony_machines
(id, last_contact,host_and_port, cpu, ram, gpu, gpuram)
VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1, 1000000)`)
require.ErrorIs(t, err, nil)
_, err = cdb.Exec(ctx, `INSERT INTO harmony_task
(id, name, owner_id, posted_time, added_by)
VALUES (1234, 'foo', 300, DATE '2000-01-01', 300)`)
require.ErrorIs(t, err, nil)
_, err = cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int, content) VALUES (1234, 'M')")
require.ErrorIs(t, err, nil)
harmonytask.POLL_DURATION = time.Millisecond * 100
harmonytask.CLEANUP_FREQUENCY = time.Millisecond * 100
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb)}, "test:2")
require.ErrorIs(t, err, nil)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
worker.GracefullyTerminate(time.Second * 5)
require.Equal(t, []string{"M"}, letters)
func TestTaskRetry(t *testing.T) {
withSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb)
harmonytask.POLL_DURATION = time.Millisecond * 100
sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1")
require.NoError(t, err)
alreadyFailed := map[string]bool{}
fails2xPerMsg := &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil },
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var content string
err = cdb.QueryRow(context.Background(),
"SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content)
require.NoError(t, err)
defer lettersMutex.Unlock()
if !alreadyFailed[content] {
alreadyFailed[content] = true
return false, errors.New("intentional 'error'")
letters = append(letters, content)
return true, nil
rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2")
require.NoError(t, err)
time.Sleep(3 * time.Second)
require.Equal(t, []string{"A", "B"}, letters)
type hist struct {
TaskID int
Result bool
Err string
var res []hist
require.NoError(t, cdb.Select(context.Background(), &res,
`SELECT task_id, result, err FROM harmony_task_history
ORDER BY result DESC, task_id`))
require.Equal(t, []hist{
{1, true, ""},
{2, true, ""},
{1, false, "error: intentional 'error'"},
{2, false, "error: intentional 'error'"}}, res)
FUTURE test fast-pass round-robin via http calls (3party) once the API for that is set
It's necessary for WinningPoSt.
FUTURE test follows.
It's necessary for sealing work.
@ -118,21 +118,25 @@ type tracer struct {
type ctxkey string
type ctxkey string
var sqlStart = ctxkey("sqlStart")
const SQL_START = ctxkey("sqlStart")
const SQL_STRING = ctxkey("sqlString")
func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
return context.WithValue(ctx, sqlStart, time.Now())
return context.WithValue(context.WithValue(ctx, SQL_START, time.Now()), SQL_STRING, data.SQL)
func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
ms := time.Since(ctx.Value(sqlStart).(time.Time)).Milliseconds()
ms := time.Since(ctx.Value(SQL_START).(time.Time)).Milliseconds()
if data.Err != nil {
if data.Err != nil {
// Can log what type of query it is, but not what tables
logger.Debugw("SQL run",
// Can log rows affected.
"query", ctx.Value(SQL_STRING).(string),
"err", data.Err,
"rowCt", data.CommandTag.RowsAffected(),
"milliseconds", ms)
// addStatsAndConnect connects a prometheus logger. Be sure to run this before using the DB.
// addStatsAndConnect connects a prometheus logger. Be sure to run this before using the DB.
@ -250,8 +254,9 @@ func (db *DB) upgrade() error {
_, err = db.pgx.Exec(context.Background(), s)
_, err = db.pgx.Exec(context.Background(), s)
if err != nil {
if err != nil {
db.log(fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error()))
msg := fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error())
return err
return errors.New(msg) // makes devs lives easier by placing message at the end.
@ -2,5 +2,6 @@ CREATE TABLE itest_scratch (
content TEXT,
content TEXT,
some_int INTEGER,
some_int INTEGER,
second_int INTEGER,
update_time TIMESTAMP DEFAULT current_timestamp
update_time TIMESTAMP DEFAULT current_timestamp
@ -0,0 +1,52 @@
/* For HarmonyTask base implementation. */
CREATE TABLE harmony_machines (
last_contact TIMESTAMP NOT NULL DEFAULT current_timestamp,
host_and_port varchar(300) NOT NULL,
CREATE TABLE harmony_task (
initiated_by INTEGER,
update_time TIMESTAMP NOT NULL DEFAULT current_timestamp,
owner_id INTEGER REFERENCES harmony_machines (id) ON DELETE SET NULL,
previous_task INTEGER,
name varchar(8) NOT NULL
COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.';
COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.';
COMMENT ON COLUMN harmony_task.name IS 'The name of the task type.';
COMMENT ON COLUMN harmony_task.owner_id IS 'may be null if between owners or not yet taken';
COMMENT ON COLUMN harmony_task.update_time IS 'When it was last modified. not a heartbeat';
CREATE TABLE harmony_task_history (
err varchar
COMMENT ON COLUMN harmony_task_history.result IS 'Use to detemine if this was a successful run.';
CREATE TABLE harmony_task_follow (
to_type VARCHAR(8) NOT NULL,
from_type VARCHAR(8) NOT NULL
CREATE TABLE harmony_task_impl (
Normal file
Normal file
@ -0,0 +1,79 @@
Package harmomnytask implements a pure (no task logic), distributed
task manager. This clean interface allows a task implementer to completely
avoid being concerned with task scheduling and management.
It's based on the idea of tasks as small units of work broken from other
work by hardware, parallelizabilty, reliability, or any other reason.
Workers will be Greedy: vaccuuming up their favorite jobs from a list.
Once 1 task is accepted, harmonydb tries to get other task runner
machines to accept work (round robin) before trying again to accept.
Mental Model:
Things that block tasks:
- task not registered for any running server
- max was specified and reached
- resource exhaustion
- CanAccept() interface (per-task implmentation) does not accept it.
Ways tasks start: (slowest first)
- DB Read every 1 minute
- Bump via HTTP if registered in DB
- Task was added (to db) by this process
Ways tasks get added:
- Async Listener task (for chain, etc)
- Followers: Tasks get added because another task completed
When Follower collectors run:
- If both sides are process-local, then
- Otherwise, at the listen interval during db scrape
How duplicate tasks are avoided:
- that's up to the task definition, but probably a unique key
To use:
1.Implement TaskInterface for a new task.
2 Have New() receive this & all other ACTIVE implementations.
As we are not expecting DBAs in this database, it's important to know
what grows uncontrolled. The only harmony_* table is _task_history
(somewhat quickly) and harmony_machines (slowly). These will need a
clean-up for after the task data could never be acted upon.
but the design **requires** extraInfo tables to grow until the task's
info could not possibly be used by a following task, including slow
release rollout. This would normally be in the order of months old.
Other possible enhancements include more collaboative coordination
to assign a task to machines closer to the data.
harmony_task is the list of work that has not been completed.
AddTaskFunc manages the additions, but is designed to have its
transactions failed-out on overlap with a similar task already written.
It's up to the TaskInterface implementer to discover this overlap via
some other table it uses (since overlap can mean very different things).
This holds transactions that completed or saw too many retries. It also
serves as input for subsequent (follower) tasks to kick off. This is not
done machine-internally because a follower may not be on the same machine
as the previous task.
Managed by lib/harmony/resources, this is a reference to machines registered
via the resources. This registration does not obligate the machine to
anything, but serves as a discovery mechanism. Paths are hostnames + ports
which are presumed to support http, but this assumption is only used by
the task system.
harmony_task_follow / harmony_task_impl
These tables are used to fast-path notifications to other machines instead
of waiting for polling. _impl helps round-robin work pick-up. _follow helps
discover the machines that are interested in creating tasks following the
task that just completed.
package harmonytask
Normal file
Normal file
@ -0,0 +1,386 @@
package harmonytask
import (
// Consts (except for unit test)
var POLL_DURATION = time.Minute // Poll for Work this frequently
var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone
type TaskTypeDetails struct {
// Max returns how many tasks this machine can run of this type.
// Negative means unrestricted.
Max int
// Name is the task name to be added to the task list.
Name string
// Peak costs to Do() the task.
Cost resources.Resources
// Max Failure count before the job is dropped.
// 0 = retry forever
MaxFailures uint
// Follow another task's completion via this task's creation.
// The function should populate extraInfo from data
// available from the previous task's tables, using the given TaskID.
// It should also return success if the trigger succeeded.
// NOTE: if refatoring tasks, see if your task is
// necessary. Ex: Is the sector state correct for your stage to run?
Follows map[string]func(TaskID, AddTaskFunc) bool
// TaskInterface must be implemented in order to have a task used by harmonytask.
type TaskInterface interface {
// Do the task assigned. Call stillOwned before making single-writer-only
// changes to ensure the work has not been stolen.
// This is the ONLY function that should attempt to do the work, and must
// ONLY be called by harmonytask.
// Indicate if the task no-longer needs scheduling with done=true including
// cases where it's past the deadline.
Do(taskID TaskID, stillOwned func() bool) (done bool, err error)
// CanAccept should return if the task can run on this machine. It should
// return null if the task type is not allowed on this machine.
// It should select the task it most wants to accomplish.
// It is also responsible for determining disk space (including scratch).
CanAccept([]TaskID) (*TaskID, error)
// TypeDetails() returns static details about how this task behaves and
// how this machine will run it. Read once at the beginning.
TypeDetails() TaskTypeDetails
// This listener will consume all external sources continuously for work.
// Do() may also be called from a backlog of work. This must not
// start doing the work (it still must be scheduled).
// Note: Task de-duplication should happen in ExtraInfoFunc by
// returning false, typically by determining from the tx that the work
// exists already. The easy way is to have a unique joint index
// across all fields that will be common.
// Adder should typically only add its own task type, but multiple
// is possible for when 1 trigger starts 2 things.
// Usage Example:
// func (b *BazType)Adder(addTask AddTaskFunc) {
// for {
// bazMaker := <- bazChannel
// addTask("baz", func(t harmonytask.TaskID, txn db.Transaction) bool {
// _, err := txn.Exec(`INSERT INTO bazInfoTable (taskID, qix, mot)
// VALUES ($1,$2,$3)`, id, bazMaker.qix, bazMaker.mot)
// if err != nil {
// scream(err)
// return false
// }
// return true
// })
// }
// }
type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) bool)
type TaskEngine struct {
ctx context.Context
handlers []*taskTypeHandler
db *harmonydb.DB
workAdderMutex *notifyingMx
reg *resources.Reg
grace context.CancelFunc
taskMap map[string]*taskTypeHandler
ownerID int
tryAllWork chan bool // notify if work completed
follows map[string][]followStruct
lastFollowTime time.Time
lastCleanup atomic.Value
type followStruct struct {
f func(TaskID, AddTaskFunc) bool
h *taskTypeHandler
type TaskID int
// New creates all the task definitions. Note that TaskEngine
// knows nothing about the tasks themselves and serves to be a
// generic container for common work
func New(
db *harmonydb.DB,
impls []TaskInterface,
hostnameAndPort string) (*TaskEngine, error) {
reg, err := resources.Register(db, hostnameAndPort)
if err != nil {
return nil, fmt.Errorf("cannot get resources: %w", err)
ctx, grace := context.WithCancel(context.Background())
e := &TaskEngine{
ctx: ctx,
grace: grace,
db: db,
reg: reg,
ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort"
workAdderMutex: ¬ifyingMx{},
taskMap: make(map[string]*taskTypeHandler, len(impls)),
tryAllWork: make(chan bool),
follows: make(map[string][]followStruct),
for _, c := range impls {
h := taskTypeHandler{
TaskInterface: c,
TaskTypeDetails: c.TypeDetails(),
TaskEngine: e,
e.handlers = append(e.handlers, &h)
e.taskMap[h.TaskTypeDetails.Name] = &h
_, err := db.Exec(e.ctx, `INSERT INTO harmony_task_impl (owner_id, name)
VALUES ($1,$2)`, e.ownerID, h.Name)
if err != nil {
return nil, fmt.Errorf("can't update impl: %w", err)
for name, fn := range c.TypeDetails().Follows {
e.follows[name] = append(e.follows[name], followStruct{fn, &h})
// populate harmony_task_follows
_, err := db.Exec(e.ctx, `INSERT INTO harmony_task_follows (owner_id, from_task, to_task)
VALUES ($1,$2,$3)`, e.ownerID, name, h.Name)
if err != nil {
return nil, fmt.Errorf("can't update harmony_task_follows: %w", err)
// resurrect old work
var taskRet []struct {
ID int
Name string
err := db.Select(e.ctx, &taskRet, `SELECT id, name from harmony_task WHERE owner_id=$1`, e.ownerID)
if err != nil {
return nil, err
for _, w := range taskRet {
// edge-case: if old assignments are not available tasks, unlock them.
h := e.taskMap[w.Name]
if h == nil {
_, err := db.Exec(e.ctx, `UPDATE harmony_task SET owner=NULL WHERE id=$1`, w.ID)
if err != nil {
log.Error("Cannot remove self from owner field: ", err)
continue // not really fatal, but not great
if !h.considerWork([]TaskID{TaskID(w.ID)}) {
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
for _, h := range e.handlers {
go h.Adder(h.AddTask)
go e.poller()
return e, nil
// GracefullyTerminate hangs until all present tasks have completed.
// Call this to cleanly exit the process. As some processes are long-running,
// passing a deadline will ignore those still running (to be picked-up later).
func (e *TaskEngine) GracefullyTerminate(deadline time.Duration) {
deadlineChan := time.NewTimer(deadline).C
// block bumps & follows by unreg from DBs.
_, err := e.db.Exec(context.Background(), `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
_, err = e.db.Exec(context.Background(), `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
for _, h := range e.handlers {
if h.Count.Load() > 0 {
select {
case <-deadlineChan:
goto top
func (e *TaskEngine) poller() {
for {
select {
case <-e.tryAllWork: ///////////////////// Find work after some work finished
case <-time.NewTicker(POLL_DURATION).C: // Find work periodically
case <-e.ctx.Done(): ///////////////////// Graceful exit
e.followWorkInDB() // "Follows" the slow way
e.pollerTryAllWork() // "Bumps" (round robin tasks) the slow way
// followWorkInDB implements "Follows" the slow way
func (e *TaskEngine) followWorkInDB() {
// Step 1: What are we following?
var lastFollowTime time.Time
lastFollowTime, e.lastFollowTime = e.lastFollowTime, time.Now()
for from_name, srcs := range e.follows {
var cList []int // Which work is done (that we follow) since we last checked?
err := e.db.Select(e.ctx, &cList, `SELECT h.task_id FROM harmony_task_history
WHERE h.work_end>$1 AND h.name=$2`, lastFollowTime, from_name)
if err != nil {
log.Error("Could not query DB: ", err)
for _, src := range srcs {
for _, workAlreadyDone := range cList { // Were any tasks made to follow these tasks?
var ct int
err := e.db.QueryRow(e.ctx, `SELECT COUNT(*) FROM harmony_task
WHERE name=$1 AND previous_task=$2`, src.h.Name, workAlreadyDone).Scan(&ct)
if err != nil {
log.Error("Could not query harmony_task: ", err)
return // not recoverable here
if ct > 0 {
// we need to create this task
if !src.h.Follows[from_name](TaskID(workAlreadyDone), src.h.AddTask) {
// But someone may have beaten us to it.
log.Infof("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, from_name)
// pollerTryAllWork implements "Bumps" (next task) the slow way
func (e *TaskEngine) pollerTryAllWork() {
if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY {
resources.CleanupMachines(e.ctx, e.db)
for _, v := range e.handlers {
if v.AssertMachineHasCapacity() != nil {
var unownedTasks []TaskID
err := e.db.Select(e.ctx, &unownedTasks, `SELECT id
FROM harmony_task
WHERE owner_id IS NULL AND name=$1
ORDER BY update_time`, v.Name)
if err != nil {
log.Error("Unable to read work ", err)
accepted := v.considerWork(unownedTasks)
if !accepted {
log.Warn("Work not accepted")
if len(unownedTasks) > 1 {
e.bump(v.Name) // wait for others before trying again to add work.
goto rerun
// AddHttpHandlers TODO this needs to be called by the http server to register routes.
// This implements the receiver-side of "follows" and "bumps" the fast way.
func (e *TaskEngine) AddHttpHandlers(root gin.IRouter) {
s := root.Group("/scheduler/")
f := s.Group("/follows")
for name, v := range e.follows {
f.GET("/"+name+"/:tID", func(c *gin.Context) {
tIDString := c.Param("tID")
tID, err := strconv.Atoi(tIDString)
if err != nil {
c.AbortWithError(401, err)
taskAdded := false
for _, v := range v {
taskAdded = taskAdded || v.f(TaskID(tID), v.h.AddTask)
if taskAdded {
e.tryAllWork <- true
c.Status(202) // NOTE: 202 for "accepted" but not worked.
b := s.Group("/bump")
for _, h := range e.handlers {
b.GET("/"+h.Name+"/:tID", func(c *gin.Context) {
tIDString := c.Param("tID")
tID, err := strconv.Atoi(tIDString)
if err != nil {
c.AbortWithError(401, err)
// We NEED to block while trying to deliver
// this work to ease the network impact.
if h.considerWork([]TaskID{TaskID(tID)}) {
c.Status(202) // NOTE: 202 for "accepted" but not worked.
func (e *TaskEngine) bump(taskType string) {
var res []string
err := e.db.Select(e.ctx, &res, `SELECT host_and_port FROM harmony_machines m
JOIN harmony_task_impl i ON i.owner_id=m.id
WHERE i.name=$1`, taskType)
if err != nil {
log.Error("Could not read db for bump: ", err)
for _, url := range res {
resp, err := hClient.Get(url + "/scheduler/bump/" + taskType)
if err != nil {
log.Info("Server unreachable to bump: ", err)
if resp.StatusCode == 200 {
return // just want 1 taker.
// resourcesInUse requires workListsMutex to be already locked.
func (e *TaskEngine) resourcesInUse() resources.Resources {
tmp := e.reg.Resources
for _, t := range e.handlers {
ct := t.Count.Load()
tmp.Cpu -= int(ct) * t.Cost.Cpu
tmp.Gpu -= float64(ct) * t.Cost.Gpu
tmp.Ram -= uint64(ct) * t.Cost.Ram
return tmp
Normal file
Normal file
@ -0,0 +1,16 @@
package harmonytask
import "sync"
type notifyingMx struct {
UnlockNotify func()
func (n *notifyingMx) Unlock() {
tmp := n.UnlockNotify
if tmp != nil {
Normal file
Normal file
@ -0,0 +1,276 @@
package harmonytask
import (
logging "github.com/ipfs/go-log/v2"
var log = logging.Logger("harmonytask")
type taskTypeHandler struct {
TaskEngine *TaskEngine
Count atomic.Int32 /// locked by TaskEngine's mutex
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) {
var tID TaskID
did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool {
// create taskID (from DB)
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID)
if err != nil {
log.Error("Could not insert into harmonyTask", err)
return false
err = tx.QueryRow("SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1").Scan(&tID)
if err != nil {
log.Error("Could not select ID: ", err)
return extra(tID, tx)
if err != nil {
if !did {
if !h.considerWork([]TaskID{tID}) {
h.TaskEngine.bump(h.Name) // We can't do it. How about someone else.
func (h *taskTypeHandler) considerWork(ids []TaskID) (workAccepted bool) {
if len(ids) == 0 {
return true // stop looking for takers
// 1. Can we do any more of this task type?
if h.Max > -1 && int(h.Count.Load()) == h.Max {
log.Infow("did not accept task", "name", h.Name, "reason", "at max already")
return false
defer h.TaskEngine.workAdderMutex.Unlock()
// 2. Can we do any more work?
err := h.AssertMachineHasCapacity()
if err != nil {
return false
// 3. What does the impl say?
tID, err := h.CanAccept(ids)
if err != nil {
return false
if tID == nil {
log.Infow("did not accept task", "task_id", ids[0], "reason", "CanAccept() refused")
return false
// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
return false
if ct == 0 {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken")
return false
go func() {
var done bool
var doErr error
workStart := time.Now()
defer func() {
if r := recover(); r != nil {
log.Error("Recovered from a serious error "+
"while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r)
h.recordCompletion(*tID, workStart, done, doErr)
if done {
h.TaskEngine.tryAllWork <- true // Activate tasks in this machine
done, doErr = h.Do(*tID, func() bool {
var owner int
// Background here because we don't want GracefulRestart to block this save.
err := h.TaskEngine.db.QueryRow(context.Background(),
`SELECT owner_id FROM harmony_task WHERE id=$1`, *tID).Scan(&owner)
if err != nil {
log.Error("Cannot determine ownership: ", err)
return false
return owner == h.TaskEngine.ownerID
if doErr != nil {
log.Error("Do("+h.Name+", taskID="+strconv.Itoa(int(*tID))+") returned error: ", doErr)
return true
func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
workEnd := time.Now()
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool {
var postedTime time.Time
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)
if err != nil {
log.Error("Could not log completion: ", err)
return false
result := "unspecified error"
if done {
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {
log.Error("Could not log completion: ", err)
return false
result = ""
} else {
if doErr != nil {
result = "error: " + doErr.Error()
var deleteTask bool
if h.MaxFailures > 0 {
ct := uint(0)
err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history
WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct)
if err != nil {
log.Error("Could not read task history:", err)
return false
if ct >= h.MaxFailures {
deleteTask = true
if deleteTask {
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {
log.Error("Could not delete failed job: ", err)
return false
// Note: Extra Info is left laying around for later review & clean-up
} else {
tx.Exec(`UPDATE harmony_task SET owner_id=NULL WHERE id=$1`, tID)
if err != nil {
log.Error("Could not disown failed task: ", tID, err)
return false
_, err = tx.Exec(`INSERT INTO harmony_task_history
(task_id, name, posted, work_start, work_end, result, err)
VALUES ($1, $2, $3, $4, $5, $6, $7)`, tID, h.Name, postedTime, workStart, workEnd, done, result)
if err != nil {
log.Error("Could not write history: ", err)
return false
return true
if err != nil {
log.Error("Could not record transaction: ", err)
if !cm {
log.Error("Committing the task records failed")
func (h *taskTypeHandler) AssertMachineHasCapacity() error {
r := h.TaskEngine.resourcesInUse()
if r.Cpu-h.Cost.Cpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of cpu")
if h.Cost.Ram > r.Ram {
return errors.New("Did not accept " + h.Name + " task: out of RAM")
if r.Gpu-h.Cost.Gpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of available GPU")
return nil
var hClient = http.Client{}
func init() {
hClient.Timeout = 3 * time.Second
// triggerCompletionListeners does in order:
// 1. Trigger all in-process followers (b/c it's fast).
// 2. Trigger all living processes with followers via DB
// 3. Future followers (think partial upgrade) can read harmony_task_history
// 3a. The Listen() handles slow follows.
func (h *taskTypeHandler) triggerCompletionListeners(tID TaskID) {
// InProcess (#1 from Description)
inProcessDefs := h.TaskEngine.follows[h.Name]
inProcessFollowers := make([]string, len(inProcessDefs))
for _, fs := range inProcessDefs {
if fs.f(tID, fs.h.AddTask) {
inProcessFollowers = append(inProcessFollowers, fs.h.Name)
// Over HTTP (#2 from Description)
var hps []struct {
HostAndPort string
ToType string
err := h.TaskEngine.db.Select(h.TaskEngine.ctx, &hps, `SELECT m.host_and_port, to_type
FROM harmony_task_follow f JOIN harmony_machines m ON m.id=f.owner_id
WHERE from_type=$1 AND to_type NOT IN $2 AND f.owner_id != $3`,
h.Name, inProcessFollowers, h.TaskEngine.ownerID)
if err != nil {
log.Warn("Could not fast-trigger partner processes.", err)
hostsVisited := map[string]bool{}
tasksVisited := map[string]bool{}
for _, v := range hps {
if hostsVisited[v.HostAndPort] || tasksVisited[v.ToType] {
resp, err := hClient.Get(v.HostAndPort + "/scheduler/follows/" + h.Name)
if err != nil {
log.Warn("Couldn't hit http endpoint: ", err)
b, err := io.ReadAll(resp.Body)
if err != nil {
log.Warn("Couldn't hit http endpoint: ", err)
hostsVisited[v.HostAndPort], tasksVisited[v.ToType] = true, true
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
log.Error("IO failed for fast nudge: ", string(b))
Normal file
Normal file
@ -0,0 +1,22 @@
//go:build darwin || freebsd || openbsd || dragonfly || netbsd
// +build darwin freebsd openbsd dragonfly netbsd
package resources
import (
func sysctlUint64(name string) (uint64, error) {
s, err := syscall.Sysctl(name)
if err != nil {
return 0, err
// hack because the string conversion above drops a \0
b := []byte(s)
if len(b) < 8 {
b = append(b, 0)
return binary.LittleEndian.Uint64(b), nil
Normal file
Normal file
@ -0,0 +1,180 @@
package resources
import (
cl "github.com/Nv7-Github/go-cl"
ffi "github.com/filecoin-project/filecoin-ffi"
logging "github.com/ipfs/go-log/v2"
var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats
type Resources struct {
Cpu int
Gpu float64
GpuRam uint64
Ram uint64
MachineID int
type Reg struct {
shutdown atomic.Bool
var logger = logging.Logger("harmonytask")
var lotusRE = regexp.MustCompile("lotus-worker|lotus-harmony|yugabyted")
func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
var reg Reg
var err error
reg.Resources, err = getResources()
if err != nil {
return nil, err
ctx := context.Background()
{ // Learn our owner_id while updating harmony_machines
var ownerID []int
err := db.Select(ctx, &ownerID, `SELECT id FROM harmony_machines WHERE host_and_port=$1`, hostnameAndPort)
if err != nil {
return nil, fmt.Errorf("could not read from harmony_machines: %w", err)
if len(ownerID) == 0 {
err = db.QueryRow(ctx, `INSERT INTO harmony_machines
(host_and_port, cpu, ram, gpu, gpuram) VALUES
($1,$2,$3,$4,$5) RETURNING id`,
hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu, reg.GpuRam).Scan(®.Resources.MachineID)
if err != nil {
return nil, err
} else {
reg.MachineID = ownerID[0]
_, err := db.Exec(ctx, `UPDATE harmony_machines SET
cpu=$1, ram=$2, gpu=$3, gpuram=$4 WHERE id=$6`,
reg.Cpu, reg.Ram, reg.Gpu, reg.GpuRam, reg.Resources.MachineID)
if err != nil {
return nil, err
CleanupMachines(context.Background(), db)
go func() {
for {
if reg.shutdown.Load() {
_, err := db.Exec(ctx, `UPDATE harmony_machines SET last_contact=CURRENT_TIMESTAMP`)
if err != nil {
logger.Error("Cannot keepalive ", err)
return ®, nil
func CleanupMachines(ctx context.Context, db *harmonydb.DB) int {
ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`,
if err != nil {
logger.Warn("unable to delete old machines: ", err)
return ct
func (res *Reg) Shutdown() {
func getResources() (res Resources, err error) {
b, err := exec.Command(`ps`, `-ef`).CombinedOutput()
if err != nil {
logger.Warn("Could not safety check for 2+ processes: ", err)
} else {
found := 0
for _, b := range bytes.Split(b, []byte("\n")) {
if lotusRE.Match(b) {
if found > 1 {
logger.Error("This Lotus process should run alone on a machine. Use CGroup.")
res = Resources{
Cpu: runtime.NumCPU(),
Ram: memory.FreeMemory(),
GpuRam: getGpuRam(),
{ // GPU boolean
gpus, err := ffi.GetGPUDevices()
if err != nil {
logger.Errorf("getting gpu devices failed: %+v", err)
all := strings.ToLower(strings.Join(gpus, ","))
if len(gpus) > 1 || strings.Contains(all, "ati") || strings.Contains(all, "nvidia") {
res.Gpu = 1
return res, nil
func getGpuRam() uint64 {
platforms, err := cl.GetPlatforms()
if err != nil {
return 0
return uint64(lo.SumBy(platforms, func(p *cl.Platform) int64 {
d, err := p.GetDevices(cl.DeviceTypeAll)
if err != nil {
return 0
return lo.SumBy(d, func(d *cl.Device) int64 { return d.GlobalMemSize() })
func DiskFree(path string) (uint64, error) {
s := unix.Statfs_t{}
err := unix.Statfs(path, &s)
if err != nil {
return 0, err
return s.Bfree * uint64(s.Bsize), nil
/* NOT for Darwin.
func GetMemFree() uint64 {
in := unix.Sysinfo_t{}
err := unix.Sysinfo(&in)
if err != nil {
return 0
// If this is a 32-bit system, then these fields are
// uint32 instead of uint64.
// So we always convert to uint64 to match signature.
return uint64(in.Freeram) * uint64(in.Unit)
Reference in New Issue
Block a user