From 225f093103ae83cda8bf8b3567b78845c04f9de8 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 14 Aug 2023 11:42:39 -0500 Subject: [PATCH] oops committed a patch --- htask.patch | 1660 --------------------------------------------------- 1 file changed, 1660 deletions(-) delete mode 100644 htask.patch diff --git a/htask.patch b/htask.patch deleted file mode 100644 index 0c2d12b4e..000000000 --- a/htask.patch +++ /dev/null @@ -1,1660 +0,0 @@ -diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go -index 944791275..995a3cbe0 100644 ---- a/cmd/lotus-worker/main.go -+++ b/cmd/lotus-worker/main.go -@@ -609,6 +609,7 @@ var runCmd = &cli.Command{ - if err := srv.Shutdown(context.TODO()); err != nil { - log.Errorf("shutting down RPC server failed: %s", err) - } -+ //taskManager.GracefullyTerminate(5*time.Hour) - log.Warn("Graceful shutdown successful") - }() - -diff --git a/go.mod b/go.mod -index 2da784ad6..661495e89 100644 ---- a/go.mod -+++ b/go.mod -@@ -156,7 +156,7 @@ require ( - golang.org/x/exp v0.0.0-20230321023759-10a507213a29 - golang.org/x/net v0.10.0 - golang.org/x/sync v0.2.0 -- golang.org/x/sys v0.9.0 -+ golang.org/x/sys v0.10.0 - golang.org/x/term v0.9.0 - golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 - golang.org/x/tools v0.9.1 -@@ -167,6 +167,8 @@ require ( - - require ( - github.com/GeertJohan/go.incremental v1.0.0 // indirect -+ github.com/Inkeliz/go-opencl v0.0.0-20200806180703-5f0707fba006 // indirect -+ github.com/Nv7-Github/go-cl v0.0.0-20210426150049-f121093b60ef // indirect - github.com/PuerkitoBio/purell v1.1.1 // indirect - github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect - github.com/StackExchange/wmi v1.2.1 // indirect -@@ -177,8 +179,10 @@ require ( - github.com/beorn7/perks v1.0.1 // indirect - github.com/bep/debounce v1.2.1 // indirect - github.com/boltdb/bolt v1.3.1 // indirect -+ github.com/bytedance/sonic v1.9.1 // indirect - github.com/cespare/xxhash v1.1.0 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect -+ github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect - github.com/cilium/ebpf v0.9.1 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect - github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect -@@ -202,7 +206,10 @@ require ( - github.com/flynn/noise v1.0.0 // indirect - github.com/francoispqt/gojay v1.2.13 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect -+ github.com/gabriel-vasile/mimetype v1.4.2 // indirect - github.com/gdamore/encoding v1.0.0 // indirect -+ github.com/gin-contrib/sse v0.1.0 // indirect -+ github.com/gin-gonic/gin v1.9.1 // indirect - github.com/go-kit/log v0.2.1 // indirect - github.com/go-logfmt/logfmt v0.5.1 // indirect - github.com/go-logr/logr v1.2.4 // indirect -@@ -211,7 +218,11 @@ require ( - github.com/go-openapi/jsonpointer v0.19.3 // indirect - github.com/go-openapi/jsonreference v0.19.4 // indirect - github.com/go-openapi/swag v0.19.11 // indirect -+ github.com/go-playground/locales v0.14.1 // indirect -+ github.com/go-playground/universal-translator v0.18.1 // indirect -+ github.com/go-playground/validator/v10 v10.14.0 // indirect - github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect -+ github.com/goccy/go-json v0.10.2 // indirect - github.com/godbus/dbus/v5 v5.1.0 // indirect - github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/glog v1.1.0 // indirect -@@ -256,10 +267,12 @@ require ( - github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect - github.com/josharian/intern v1.0.0 // indirect - github.com/jpillora/backoff v1.0.0 // indirect -+ github.com/json-iterator/go v1.1.12 // indirect - github.com/kilic/bls12-381 v0.1.0 // indirect - github.com/klauspost/compress v1.16.5 // indirect - github.com/klauspost/cpuid/v2 v2.2.5 // indirect - github.com/koron/go-ssdp v0.0.4 // indirect -+ github.com/leodido/go-urn v1.2.4 // indirect - github.com/libp2p/go-cidranger v1.1.0 // indirect - github.com/libp2p/go-flow-metrics v0.1.0 // indirect - github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect -@@ -280,6 +293,8 @@ require ( - github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect - github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect - github.com/minio/sha256-simd v1.0.1 // indirect -+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect -+ github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/mr-tron/base58 v1.2.0 // indirect - github.com/multiformats/go-base36 v0.2.0 // indirect - github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect -@@ -291,6 +306,7 @@ require ( - github.com/opencontainers/runtime-spec v1.0.2 // indirect - github.com/opentracing/opentracing-go v1.2.0 // indirect - github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect -+ github.com/pelletier/go-toml/v2 v2.0.8 // indirect - github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect - github.com/pkg/errors v0.9.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect -@@ -306,12 +322,15 @@ require ( - github.com/rivo/uniseg v0.1.0 // indirect - github.com/rs/cors v1.7.0 // indirect - github.com/russross/blackfriday/v2 v2.1.0 // indirect -+ github.com/samber/lo v1.38.1 // indirect -+ github.com/samuel/go-opencl v0.0.0-20171108220231-cbcfd10c32ad // indirect - github.com/shirou/gopsutil v2.18.12+incompatible // indirect - github.com/sirupsen/logrus v1.9.0 // indirect - github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/tidwall/gjson v1.14.4 // indirect -+ github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/twmb/murmur3 v1.1.6 // indirect -- github.com/ugorji/go/codec v1.2.6 // indirect -+ github.com/ugorji/go/codec v1.2.11 // indirect - github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasttemplate v1.0.1 // indirect - github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect -@@ -327,6 +346,7 @@ require ( - go.opentelemetry.io/otel/trace v1.16.0 // indirect - go.uber.org/dig v1.17.0 // indirect - go4.org v0.0.0-20230225012048-214862532bf5 // indirect -+ golang.org/x/arch v0.3.0 // indirect - golang.org/x/mod v0.10.0 // indirect - golang.org/x/text v0.10.0 // indirect - gonum.org/v1/gonum v0.13.0 // indirect -diff --git a/go.sum b/go.sum -index ebbc4dcc8..74127c535 100644 ---- a/go.sum -+++ b/go.sum -@@ -59,6 +59,8 @@ github.com/GeertJohan/go.rice v1.0.3 h1:k5viR+xGtIhF61125vCE1cmJ5957RQGXG6dmbaWZ - github.com/GeertJohan/go.rice v1.0.3/go.mod h1:XVdrU4pW00M4ikZed5q56tPf1v2KwnIKeIdc9CBYNt4= - github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee h1:8doiS7ib3zi6/K172oDhSKU0dJ/miJramo9NITOMyZQ= - github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee/go.mod h1:W0GbEAA4uFNYOGG2cJpmFJ04E6SD1NLELPYZB57/7AY= -+github.com/Inkeliz/go-opencl v0.0.0-20200806180703-5f0707fba006 h1:TKWkFaRW5EPQyrS1pM0vm3vvqw/jmHu+FkV8gRD+7/w= -+github.com/Inkeliz/go-opencl v0.0.0-20200806180703-5f0707fba006/go.mod h1:9ILtD1/UTP/Y7JMCU8loWZMDvhrQuTgHzHatG6z9ZdQ= - github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= - github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= - github.com/Kubuxu/imtui v0.0.0-20210401140320-41663d68d0fa h1:1PPxEyGdIGVkX/kqMvLJ95a1dGS1Sz7tpNEgehEYYt0= -@@ -66,6 +68,8 @@ github.com/Kubuxu/imtui v0.0.0-20210401140320-41663d68d0fa/go.mod h1:WUmMvh9wMtq - github.com/Masterminds/glide v0.13.2/go.mod h1:STyF5vcenH/rUqTEv+/hBXlSTo7KYwg2oc2f4tzPWic= - github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= - github.com/Masterminds/vcs v1.13.0/go.mod h1:N09YCmOQr6RLxC6UNHzuVwAdodYbbnycGHSmwVJjcKA= -+github.com/Nv7-Github/go-cl v0.0.0-20210426150049-f121093b60ef h1:DiNnYI6NBdeXGOJXptJcrYeDavJf4tImz/B4MOVQtMs= -+github.com/Nv7-Github/go-cl v0.0.0-20210426150049-f121093b60ef/go.mod h1:RRVtxaQlBBnbo+n2fgYHhxQmXDkRLKWcWX93lJL0Yhw= - github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= - github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= - github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= -@@ -142,6 +146,9 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f - github.com/buger/goterm v1.0.3 h1:7V/HeAQHrzPk/U4BvyH2g9u+xbUW9nr4yRPyG59W4fM= - github.com/buger/goterm v1.0.3/go.mod h1:HiFWV3xnkolgrBV3mY8m0X0Pumt4zg4QhbdOzQtB8tE= - github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -+github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= -+github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= -+github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= - github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= - github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= - github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -@@ -152,6 +159,9 @@ github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL - github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= - github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= - github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= -+github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= -+github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= -+github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= - github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= - github.com/chzyer/logex v1.2.1 h1:XHDu3E6q+gdHgsdTPH6ImJMIp436vR6MPtH8gP05QzM= - github.com/chzyer/logex v1.2.1/go.mod h1:JLbx6lG2kDbNRFnfkgvh4eRJRPX1QCoOIWomwysCBrQ= -@@ -386,6 +396,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo - github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= - github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= - github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= -+github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= -+github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= - github.com/gbrlsnchs/jwt/v3 v3.0.1 h1:lbUmgAKpxnClrKloyIwpxm4OuWeDl5wLk52G91ODPw4= - github.com/gbrlsnchs/jwt/v3 v3.0.1/go.mod h1:AncDcjXz18xetI3A6STfXq2w+LuTx8pQ8bGEwRN8zVM= - github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdko= -@@ -399,6 +411,8 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE - github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= - github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= - github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= -+github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= -+github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= - github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= - github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= - github.com/go-chi/chi v1.5.4 h1:QHdzF2szwjqVV4wmByUnTcsbIg7UGaQ0tPF2t5GcAIs= -@@ -445,10 +459,16 @@ github.com/go-openapi/swag v0.19.11/go.mod h1:Uc0gKkdR+ojzsEpjh39QChyu92vPgIr72P - github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= - github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= - github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= -+github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= -+github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= - github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= - github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= -+github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= -+github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= - github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= - github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= -+github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js= -+github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= - github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= - github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= - github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -@@ -464,6 +484,8 @@ github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= - github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= - github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= - github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= -+github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -+github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= - github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= - github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= - github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= -@@ -970,6 +992,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= - github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= - github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= - github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= -+github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= -+github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= - github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= - github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= - github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= -@@ -1405,7 +1429,10 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T - github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= - github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= - github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= -+github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= - github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -+github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= -+github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= - github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= - github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk= - github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw= -@@ -1512,6 +1539,10 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf - github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= - github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk= - github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= -+github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= -+github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= -+github.com/samuel/go-opencl v0.0.0-20171108220231-cbcfd10c32ad h1:zyvTnsJPPAqVg2v3bbvTI+RdbVPJufZ+CWCPOX0Dtp8= -+github.com/samuel/go-opencl v0.0.0-20171108220231-cbcfd10c32ad/go.mod h1:KCqoxhWgoxCWg13iOq53YFf50jlonuuhIpO916aWEkg= - github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= - github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= - github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= -@@ -1598,6 +1629,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ - github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= - github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= - github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -+github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -+github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= - github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= - github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= - github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= -@@ -1618,6 +1651,8 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= - github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= - github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= - github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -+github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= -+github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= - github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= - github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= - github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= -@@ -1628,6 +1663,8 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT - github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= - github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ= - github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw= -+github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= -+github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= - github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= - github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= - github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -@@ -1789,6 +1826,9 @@ go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1 - go4.org v0.0.0-20200411211856-f5505b9728dd/go.mod h1:CIiUVy99QCPfoE13bO4EZaz5GZMZXMSBGhxRdsvzbkg= - go4.org v0.0.0-20230225012048-214862532bf5 h1:nifaUDeh+rPaBCMPMQHZmvJf+QdpLFnuQPwx+LxVmtc= - go4.org v0.0.0-20230225012048-214862532bf5/go.mod h1:F57wTi5Lrj6WLyswp5EYV1ncrEbFGHD4hhz6S1ZYeaU= -+golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= -+golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= -+golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= - golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw= - golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= - golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -@@ -2066,6 +2106,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= - golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= - golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= - golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -+golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= -+golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= - golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= - golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= - golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -@@ -2319,6 +2361,7 @@ lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1 - nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= - nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= - rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= -+rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= - rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= - rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= - sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= -diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go -new file mode 100644 -index 000000000..2c8523d82 ---- /dev/null -+++ b/itests/harmonytask_test.go -@@ -0,0 +1,247 @@ -+package itests -+ -+import ( -+ "context" -+ "errors" -+ "fmt" -+ "sort" -+ "strings" -+ "sync" -+ "testing" -+ "time" -+ -+ "github.com/filecoin-project/lotus/itests/kit" -+ "github.com/filecoin-project/lotus/lib/harmony/harmonydb" -+ "github.com/filecoin-project/lotus/lib/harmony/harmonytask" -+ "github.com/filecoin-project/lotus/lib/harmony/resources" -+ "github.com/filecoin-project/lotus/node/impl" -+ "github.com/stretchr/testify/require" -+) -+ -+type task1 struct { -+ toAdd []int -+ myPersonalTableLock sync.Mutex -+ myPersonalTable map[harmonytask.TaskID]int // This would typicallyb be a DB table -+ WorkCompleted []string -+} -+ -+func (t *task1) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { -+ if !stillOwned() { -+ return false, errors.New("Why not still owned?") -+ } -+ t.myPersonalTableLock.Lock() -+ defer t.myPersonalTableLock.Unlock() -+ t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[tID])) -+ return true, nil -+} -+func (t *task1) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { -+ return &list[0], nil -+} -+func (t *task1) TypeDetails() harmonytask.TaskTypeDetails { -+ return harmonytask.TaskTypeDetails{ -+ Max: 100, -+ Name: "ThingOne", -+ MaxFailures: 1, -+ Cost: resources.Resources{ -+ Cpu: 1, -+ Ram: 100 << 10, // at 100kb, it's tiny -+ }, -+ } -+} -+func (t *task1) Adder(add harmonytask.AddTaskFunc) { -+ for _, v := range t.toAdd { -+ add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool { -+ t.myPersonalTableLock.Lock() -+ defer t.myPersonalTableLock.Unlock() -+ -+ t.myPersonalTable[tID] = v -+ return true -+ }) -+ } -+} -+ -+func TestHarmonyTasks(t *testing.T) { -+ withSetup(t, func(m *kit.TestMiner) { -+ cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB -+ t1 := &task1{ -+ toAdd: []int{56, 73}, -+ myPersonalTable: map[harmonytask.TaskID]int{}, -+ } -+ e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1") -+ require.NoError(t, err) -+ time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. -+ e.GracefullyTerminate(time.Minute) -+ require.Equal(t, t1.WorkCompleted, 2, "wrong amount of work complete: expected 2 got:") -+ sort.Strings(t1.WorkCompleted) -+ got := strings.Join(t1.WorkCompleted, ",") -+ expected := "taskResult56,taskResult73" -+ if got != expected { -+ t.Fatal("Unexpected results! Wanted " + expected + " got " + got) -+ } -+ // TODO test history table looks right. -+ }) -+} -+ -+type passthru struct { -+ dtl harmonytask.TaskTypeDetails -+ do func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) -+ canAccept func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) -+ adder func(add harmonytask.AddTaskFunc) -+} -+ -+func (t *passthru) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { -+ return t.do(tID, stillOwned) -+} -+func (t *passthru) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { -+ return t.canAccept(list) -+} -+func (t *passthru) TypeDetails() harmonytask.TaskTypeDetails { -+ return t.dtl -+} -+func (t *passthru) Adder(add harmonytask.AddTaskFunc) { -+ if t.adder != nil { -+ t.adder(add) -+ } -+} -+ -+// Common stuff -+var dtl = harmonytask.TaskTypeDetails{Name: "foo", Max: -1, Cost: resources.Resources{}} -+var letters []string -+var lettersMutex sync.Mutex -+ -+func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru { -+ return &passthru{ -+ dtl: dtl, -+ canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return nil, nil }, -+ adder: func(add harmonytask.AddTaskFunc) { -+ for _, v := range []string{"A", "B"} { -+ add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool { -+ _, err := tx.Exec("INSERT INTO itest_scratch (some_int, content) VALUES ($1,$2)", tID, v) -+ require.NoError(t, err) -+ return true -+ }) -+ } -+ }, -+ } -+} -+func fooLetterSaver(t *testing.T, cdb *harmonydb.DB) *passthru { -+ return &passthru{ -+ dtl: dtl, -+ canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil }, -+ do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { -+ var content string -+ err = cdb.QueryRow(context.Background(), -+ "SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content) -+ require.NoError(t, err) -+ lettersMutex.Lock() -+ defer lettersMutex.Unlock() -+ letters = append(letters, content) -+ return true, nil -+ }, -+ } -+} -+ -+func TestHarmonyTasksWith2PartiesPolling(t *testing.T) { -+ withSetup(t, func(m *kit.TestMiner) { -+ cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB -+ senderParty := fooLetterAdder(t, cdb) -+ workerParty := fooLetterSaver(t, cdb) -+ harmonytask.POLL_DURATION = time.Millisecond * 100 -+ sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1") -+ require.NoError(t, err) -+ worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2") -+ require.NoError(t, err) -+ time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. -+ sender.GracefullyTerminate(time.Second * 5) -+ worker.GracefullyTerminate(time.Second * 5) -+ sort.Strings(letters) -+ require.Equal(t, letters, []string{"A", "B"}) -+ }) -+} -+ -+func TestWorkStealing(t *testing.T) { -+ withSetup(t, func(m *kit.TestMiner) { -+ cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB -+ ctx := context.Background() -+ -+ // The dead worker will be played by a few SQL INSERTS. -+ _, err := cdb.Exec(ctx, `INSERT INTO harmony_machines -+ (id, last_contact,host_and_port, cpu, ram, gpu, gpuram) -+ VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1, 1000000)`) -+ require.ErrorIs(t, err, nil) -+ _, err = cdb.Exec(ctx, `INSERT INTO harmony_task -+ (id, name, owner_id, posted_time, added_by) -+ VALUES (1234, 'foo', 300, DATE '2000-01-01', 300)`) -+ require.ErrorIs(t, err, nil) -+ _, err = cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int, content) VALUES (1234, 'M')") -+ require.ErrorIs(t, err, nil) -+ -+ harmonytask.POLL_DURATION = time.Millisecond * 100 -+ harmonytask.CLEANUP_FREQUENCY = time.Millisecond * 100 -+ worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb)}, "test:2") -+ require.ErrorIs(t, err, nil) -+ time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. -+ worker.GracefullyTerminate(time.Second * 5) -+ require.Equal(t, []string{"M"}, letters) -+ }) -+} -+ -+func TestTaskRetry(t *testing.T) { -+ withSetup(t, func(m *kit.TestMiner) { -+ cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB -+ senderParty := fooLetterAdder(t, cdb) -+ harmonytask.POLL_DURATION = time.Millisecond * 100 -+ sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1") -+ require.NoError(t, err) -+ -+ alreadyFailed := map[string]bool{} -+ fails2xPerMsg := &passthru{ -+ dtl: dtl, -+ canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil }, -+ do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { -+ var content string -+ err = cdb.QueryRow(context.Background(), -+ "SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content) -+ require.NoError(t, err) -+ lettersMutex.Lock() -+ defer lettersMutex.Unlock() -+ if !alreadyFailed[content] { -+ alreadyFailed[content] = true -+ return false, errors.New("intentional 'error'") -+ } -+ letters = append(letters, content) -+ return true, nil -+ }, -+ } -+ rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2") -+ require.NoError(t, err) -+ time.Sleep(3 * time.Second) -+ sender.GracefullyTerminate(time.Hour) -+ rcv.GracefullyTerminate(time.Hour) -+ sort.Strings(letters) -+ require.Equal(t, []string{"A", "B"}, letters) -+ type hist struct { -+ TaskID int -+ Result bool -+ Err string -+ } -+ var res []hist -+ require.NoError(t, cdb.Select(context.Background(), &res, -+ `SELECT task_id, result, err FROM harmony_task_history -+ ORDER BY result DESC, task_id`)) -+ -+ require.Equal(t, []hist{ -+ {1, true, ""}, -+ {2, true, ""}, -+ {1, false, "error: intentional 'error'"}, -+ {2, false, "error: intentional 'error'"}}, res) -+ }) -+} -+ -+/* -+FUTURE test fast-pass round-robin via http calls (3party) once the API for that is set -+It's necessary for WinningPoSt. -+ -+FUTURE test follows. -+It's necessary for sealing work. -+*/ -diff --git a/lib/harmony/harmonydb/harmonydb.go b/lib/harmony/harmonydb/harmonydb.go -index fd31e7a13..48e3db6fa 100644 ---- a/lib/harmony/harmonydb/harmonydb.go -+++ b/lib/harmony/harmonydb/harmonydb.go -@@ -118,21 +118,25 @@ type tracer struct { - - type ctxkey string - --var sqlStart = ctxkey("sqlStart") -+const SQL_START = ctxkey("sqlStart") -+const SQL_STRING = ctxkey("sqlString") - - func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context { -- return context.WithValue(ctx, sqlStart, time.Now()) -+ return context.WithValue(context.WithValue(ctx, SQL_START, time.Now()), SQL_STRING, data.SQL) - } - func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) { - DBMeasures.Hits.M(1) -- ms := time.Since(ctx.Value(sqlStart).(time.Time)).Milliseconds() -+ ms := time.Since(ctx.Value(SQL_START).(time.Time)).Milliseconds() - DBMeasures.TotalWait.M(ms) - DBMeasures.Waits.Observe(float64(ms)) - if data.Err != nil { - DBMeasures.Errors.M(1) - } -- // Can log what type of query it is, but not what tables -- // Can log rows affected. -+ logger.Debugw("SQL run", -+ "query", ctx.Value(SQL_STRING).(string), -+ "err", data.Err, -+ "rowCt", data.CommandTag.RowsAffected(), -+ "milliseconds", ms) - } - - // addStatsAndConnect connects a prometheus logger. Be sure to run this before using the DB. -@@ -250,8 +254,9 @@ func (db *DB) upgrade() error { - } - _, err = db.pgx.Exec(context.Background(), s) - if err != nil { -- db.log(fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error())) -- return err -+ msg := fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error()) -+ db.log(msg) -+ return errors.New(msg) // makes devs lives easier by placing message at the end. - } - } - -diff --git a/lib/harmony/harmonydb/sql/20230706.sql b/lib/harmony/harmonydb/sql/20230706.sql -index b45aca7fa..a4a333b81 100644 ---- a/lib/harmony/harmonydb/sql/20230706.sql -+++ b/lib/harmony/harmonydb/sql/20230706.sql -@@ -2,5 +2,6 @@ CREATE TABLE itest_scratch ( - id SERIAL PRIMARY KEY, - content TEXT, - some_int INTEGER, -+ second_int INTEGER, - update_time TIMESTAMP DEFAULT current_timestamp - ) -\ No newline at end of file -diff --git a/lib/harmony/harmonydb/sql/20230719.sql b/lib/harmony/harmonydb/sql/20230719.sql -new file mode 100644 -index 000000000..0a676526b ---- /dev/null -+++ b/lib/harmony/harmonydb/sql/20230719.sql -@@ -0,0 +1,52 @@ -+/* For HarmonyTask base implementation. */ -+ -+CREATE TABLE harmony_machines ( -+ id SERIAL PRIMARY KEY NOT NULL, -+ last_contact TIMESTAMP NOT NULL DEFAULT current_timestamp, -+ host_and_port varchar(300) NOT NULL, -+ cpu INTEGER NOT NULL, -+ ram BIGINT NOT NULL, -+ gpu FLOAT NOT NULL, -+ gpuram BIGINT NOT NULL -+); -+ -+CREATE TABLE harmony_task ( -+ id SERIAL PRIMARY KEY NOT NULL, -+ initiated_by INTEGER, -+ update_time TIMESTAMP NOT NULL DEFAULT current_timestamp, -+ posted_time TIMESTAMP NOT NULL, -+ owner_id INTEGER REFERENCES harmony_machines (id) ON DELETE SET NULL, -+ added_by INTEGER NOT NULL, -+ previous_task INTEGER, -+ name varchar(8) NOT NULL -+); -+COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.'; -+COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.'; -+COMMENT ON COLUMN harmony_task.name IS 'The name of the task type.'; -+COMMENT ON COLUMN harmony_task.owner_id IS 'may be null if between owners or not yet taken'; -+COMMENT ON COLUMN harmony_task.update_time IS 'When it was last modified. not a heartbeat'; -+ -+CREATE TABLE harmony_task_history ( -+ id SERIAL PRIMARY KEY NOT NULL, -+ task_id INTEGER NOT NULL, -+ name VARCHAR(8) NOT NULL, -+ posted TIMESTAMP NOT NULL, -+ work_start TIMESTAMP NOT NULL, -+ work_end TIMESTAMP NOT NULL, -+ result BOOLEAN NOT NULL, -+ err varchar -+); -+COMMENT ON COLUMN harmony_task_history.result IS 'Use to detemine if this was a successful run.'; -+ -+CREATE TABLE harmony_task_follow ( -+ id SERIAL PRIMARY KEY NOT NULL, -+ owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE, -+ to_type VARCHAR(8) NOT NULL, -+ from_type VARCHAR(8) NOT NULL -+); -+ -+CREATE TABLE harmony_task_impl ( -+ id SERIAL PRIMARY KEY NOT NULL, -+ owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE, -+ name VARCHAR(8) NOT NULL -+); -\ No newline at end of file -diff --git a/lib/harmony/harmonytask/doc.go b/lib/harmony/harmonytask/doc.go -new file mode 100644 -index 000000000..357c3e15c ---- /dev/null -+++ b/lib/harmony/harmonytask/doc.go -@@ -0,0 +1,79 @@ -+/* -+ Package harmomnytask implements a pure (no task logic), distributed -+ task manager. This clean interface allows a task implementer to completely -+ -+avoid being concerned with task scheduling and management. -+It's based on the idea of tasks as small units of work broken from other -+work by hardware, parallelizabilty, reliability, or any other reason. -+Workers will be Greedy: vaccuuming up their favorite jobs from a list. -+Once 1 task is accepted, harmonydb tries to get other task runner -+machines to accept work (round robin) before trying again to accept. -+* -+Mental Model: -+ -+ Things that block tasks: -+ - task not registered for any running server -+ - max was specified and reached -+ - resource exhaustion -+ - CanAccept() interface (per-task implmentation) does not accept it. -+ Ways tasks start: (slowest first) -+ - DB Read every 1 minute -+ - Bump via HTTP if registered in DB -+ - Task was added (to db) by this process -+ Ways tasks get added: -+ - Async Listener task (for chain, etc) -+ - Followers: Tasks get added because another task completed -+ When Follower collectors run: -+ - If both sides are process-local, then -+ - Otherwise, at the listen interval during db scrape -+ How duplicate tasks are avoided: -+ - that's up to the task definition, but probably a unique key -+ -+* -+To use: -+1.Implement TaskInterface for a new task. -+2 Have New() receive this & all other ACTIVE implementations. -+* -+* -+As we are not expecting DBAs in this database, it's important to know -+what grows uncontrolled. The only harmony_* table is _task_history -+(somewhat quickly) and harmony_machines (slowly). These will need a -+clean-up for after the task data could never be acted upon. -+but the design **requires** extraInfo tables to grow until the task's -+info could not possibly be used by a following task, including slow -+release rollout. This would normally be in the order of months old. -+* -+Other possible enhancements include more collaboative coordination -+to assign a task to machines closer to the data. -+ -+__Database_Behavior__ -+harmony_task is the list of work that has not been completed. -+ -+ AddTaskFunc manages the additions, but is designed to have its -+ transactions failed-out on overlap with a similar task already written. -+ It's up to the TaskInterface implementer to discover this overlap via -+ some other table it uses (since overlap can mean very different things). -+ -+harmony_task_history -+ -+ This holds transactions that completed or saw too many retries. It also -+ serves as input for subsequent (follower) tasks to kick off. This is not -+ done machine-internally because a follower may not be on the same machine -+ as the previous task. -+ -+harmony_task_machines -+ -+ Managed by lib/harmony/resources, this is a reference to machines registered -+ via the resources. This registration does not obligate the machine to -+ anything, but serves as a discovery mechanism. Paths are hostnames + ports -+ which are presumed to support http, but this assumption is only used by -+ the task system. -+ -+harmony_task_follow / harmony_task_impl -+ -+ These tables are used to fast-path notifications to other machines instead -+ of waiting for polling. _impl helps round-robin work pick-up. _follow helps -+ discover the machines that are interested in creating tasks following the -+ task that just completed. -+*/ -+package harmonytask -diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go -new file mode 100644 -index 000000000..1f5662959 ---- /dev/null -+++ b/lib/harmony/harmonytask/harmonytask.go -@@ -0,0 +1,386 @@ -+package harmonytask -+ -+import ( -+ "context" -+ "fmt" -+ "strconv" -+ "sync/atomic" -+ "time" -+ -+ "github.com/filecoin-project/lotus/lib/harmony/resources" -+ "github.com/gin-gonic/gin" -+ -+ "github.com/filecoin-project/lotus/lib/harmony/harmonydb" -+) -+ -+// Consts (except for unit test) -+var POLL_DURATION = time.Minute // Poll for Work this frequently -+var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone -+ -+type TaskTypeDetails struct { -+ // Max returns how many tasks this machine can run of this type. -+ // Negative means unrestricted. -+ Max int -+ -+ // Name is the task name to be added to the task list. -+ Name string -+ -+ // Peak costs to Do() the task. -+ Cost resources.Resources -+ -+ // Max Failure count before the job is dropped. -+ // 0 = retry forever -+ MaxFailures uint -+ -+ // Follow another task's completion via this task's creation. -+ // The function should populate extraInfo from data -+ // available from the previous task's tables, using the given TaskID. -+ // It should also return success if the trigger succeeded. -+ // NOTE: if refatoring tasks, see if your task is -+ // necessary. Ex: Is the sector state correct for your stage to run? -+ Follows map[string]func(TaskID, AddTaskFunc) bool -+} -+ -+// TaskInterface must be implemented in order to have a task used by harmonytask. -+type TaskInterface interface { -+ // Do the task assigned. Call stillOwned before making single-writer-only -+ // changes to ensure the work has not been stolen. -+ // This is the ONLY function that should attempt to do the work, and must -+ // ONLY be called by harmonytask. -+ // Indicate if the task no-longer needs scheduling with done=true including -+ // cases where it's past the deadline. -+ Do(taskID TaskID, stillOwned func() bool) (done bool, err error) -+ -+ // CanAccept should return if the task can run on this machine. It should -+ // return null if the task type is not allowed on this machine. -+ // It should select the task it most wants to accomplish. -+ // It is also responsible for determining disk space (including scratch). -+ CanAccept([]TaskID) (*TaskID, error) -+ -+ // TypeDetails() returns static details about how this task behaves and -+ // how this machine will run it. Read once at the beginning. -+ TypeDetails() TaskTypeDetails -+ -+ // This listener will consume all external sources continuously for work. -+ // Do() may also be called from a backlog of work. This must not -+ // start doing the work (it still must be scheduled). -+ // Note: Task de-duplication should happen in ExtraInfoFunc by -+ // returning false, typically by determining from the tx that the work -+ // exists already. The easy way is to have a unique joint index -+ // across all fields that will be common. -+ // Adder should typically only add its own task type, but multiple -+ // is possible for when 1 trigger starts 2 things. -+ // Usage Example: -+ // func (b *BazType)Adder(addTask AddTaskFunc) { -+ // for { -+ // bazMaker := <- bazChannel -+ // addTask("baz", func(t harmonytask.TaskID, txn db.Transaction) bool { -+ // _, err := txn.Exec(`INSERT INTO bazInfoTable (taskID, qix, mot) -+ // VALUES ($1,$2,$3)`, id, bazMaker.qix, bazMaker.mot) -+ // if err != nil { -+ // scream(err) -+ // return false -+ // } -+ // return true -+ // }) -+ // } -+ // } -+ Adder(AddTaskFunc) -+} -+ -+type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) bool) -+ -+type TaskEngine struct { -+ ctx context.Context -+ handlers []*taskTypeHandler -+ db *harmonydb.DB -+ workAdderMutex *notifyingMx -+ reg *resources.Reg -+ grace context.CancelFunc -+ taskMap map[string]*taskTypeHandler -+ ownerID int -+ tryAllWork chan bool // notify if work completed -+ follows map[string][]followStruct -+ lastFollowTime time.Time -+ lastCleanup atomic.Value -+} -+type followStruct struct { -+ f func(TaskID, AddTaskFunc) bool -+ h *taskTypeHandler -+} -+ -+type TaskID int -+ -+// New creates all the task definitions. Note that TaskEngine -+// knows nothing about the tasks themselves and serves to be a -+// generic container for common work -+func New( -+ db *harmonydb.DB, -+ impls []TaskInterface, -+ hostnameAndPort string) (*TaskEngine, error) { -+ -+ reg, err := resources.Register(db, hostnameAndPort) -+ if err != nil { -+ return nil, fmt.Errorf("cannot get resources: %w", err) -+ } -+ ctx, grace := context.WithCancel(context.Background()) -+ e := &TaskEngine{ -+ ctx: ctx, -+ grace: grace, -+ db: db, -+ reg: reg, -+ ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort" -+ workAdderMutex: ¬ifyingMx{}, -+ taskMap: make(map[string]*taskTypeHandler, len(impls)), -+ tryAllWork: make(chan bool), -+ follows: make(map[string][]followStruct), -+ } -+ e.lastCleanup.Store(time.Now()) -+ for _, c := range impls { -+ h := taskTypeHandler{ -+ TaskInterface: c, -+ TaskTypeDetails: c.TypeDetails(), -+ TaskEngine: e, -+ } -+ e.handlers = append(e.handlers, &h) -+ e.taskMap[h.TaskTypeDetails.Name] = &h -+ -+ _, err := db.Exec(e.ctx, `INSERT INTO harmony_task_impl (owner_id, name) -+ VALUES ($1,$2)`, e.ownerID, h.Name) -+ if err != nil { -+ return nil, fmt.Errorf("can't update impl: %w", err) -+ } -+ -+ for name, fn := range c.TypeDetails().Follows { -+ e.follows[name] = append(e.follows[name], followStruct{fn, &h}) -+ -+ // populate harmony_task_follows -+ _, err := db.Exec(e.ctx, `INSERT INTO harmony_task_follows (owner_id, from_task, to_task) -+ VALUES ($1,$2,$3)`, e.ownerID, name, h.Name) -+ if err != nil { -+ return nil, fmt.Errorf("can't update harmony_task_follows: %w", err) -+ } -+ } -+ } -+ -+ // resurrect old work -+ { -+ var taskRet []struct { -+ ID int -+ Name string -+ } -+ -+ err := db.Select(e.ctx, &taskRet, `SELECT id, name from harmony_task WHERE owner_id=$1`, e.ownerID) -+ if err != nil { -+ return nil, err -+ } -+ for _, w := range taskRet { -+ // edge-case: if old assignments are not available tasks, unlock them. -+ h := e.taskMap[w.Name] -+ if h == nil { -+ _, err := db.Exec(e.ctx, `UPDATE harmony_task SET owner=NULL WHERE id=$1`, w.ID) -+ if err != nil { -+ log.Error("Cannot remove self from owner field: ", err) -+ continue // not really fatal, but not great -+ } -+ } -+ if !h.considerWork([]TaskID{TaskID(w.ID)}) { -+ log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name) -+ } -+ } -+ } -+ for _, h := range e.handlers { -+ go h.Adder(h.AddTask) -+ } -+ go e.poller() -+ -+ return e, nil -+} -+ -+// GracefullyTerminate hangs until all present tasks have completed. -+// Call this to cleanly exit the process. As some processes are long-running, -+// passing a deadline will ignore those still running (to be picked-up later). -+func (e *TaskEngine) GracefullyTerminate(deadline time.Duration) { -+ e.grace() -+ e.reg.Shutdown() -+ deadlineChan := time.NewTimer(deadline).C -+ -+ // block bumps & follows by unreg from DBs. -+ _, err := e.db.Exec(context.Background(), `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID) -+ if err != nil { -+ log.Warn("Could not clean-up impl table: %w", err) -+ } -+ _, err = e.db.Exec(context.Background(), `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID) -+ if err != nil { -+ log.Warn("Could not clean-up impl table: %w", err) -+ } -+top: -+ for _, h := range e.handlers { -+ if h.Count.Load() > 0 { -+ select { -+ case <-deadlineChan: -+ return -+ default: -+ time.Sleep(time.Millisecond) -+ goto top -+ } -+ } -+ } -+} -+ -+func (e *TaskEngine) poller() { -+ for { -+ select { -+ case <-e.tryAllWork: ///////////////////// Find work after some work finished -+ case <-time.NewTicker(POLL_DURATION).C: // Find work periodically -+ case <-e.ctx.Done(): ///////////////////// Graceful exit -+ return -+ } -+ e.followWorkInDB() // "Follows" the slow way -+ e.pollerTryAllWork() // "Bumps" (round robin tasks) the slow way -+ } -+} -+ -+// followWorkInDB implements "Follows" the slow way -+func (e *TaskEngine) followWorkInDB() { -+ // Step 1: What are we following? -+ var lastFollowTime time.Time -+ lastFollowTime, e.lastFollowTime = e.lastFollowTime, time.Now() -+ -+ for from_name, srcs := range e.follows { -+ var cList []int // Which work is done (that we follow) since we last checked? -+ err := e.db.Select(e.ctx, &cList, `SELECT h.task_id FROM harmony_task_history -+ WHERE h.work_end>$1 AND h.name=$2`, lastFollowTime, from_name) -+ if err != nil { -+ log.Error("Could not query DB: ", err) -+ return -+ } -+ for _, src := range srcs { -+ for _, workAlreadyDone := range cList { // Were any tasks made to follow these tasks? -+ var ct int -+ err := e.db.QueryRow(e.ctx, `SELECT COUNT(*) FROM harmony_task -+ WHERE name=$1 AND previous_task=$2`, src.h.Name, workAlreadyDone).Scan(&ct) -+ if err != nil { -+ log.Error("Could not query harmony_task: ", err) -+ return // not recoverable here -+ } -+ if ct > 0 { -+ continue -+ } -+ // we need to create this task -+ if !src.h.Follows[from_name](TaskID(workAlreadyDone), src.h.AddTask) { -+ // But someone may have beaten us to it. -+ log.Infof("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, from_name) -+ } -+ } -+ } -+ } -+} -+ -+// pollerTryAllWork implements "Bumps" (next task) the slow way -+func (e *TaskEngine) pollerTryAllWork() { -+ if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY { -+ e.lastCleanup.Store(time.Now()) -+ resources.CleanupMachines(e.ctx, e.db) -+ } -+ for _, v := range e.handlers { -+ rerun: -+ if v.AssertMachineHasCapacity() != nil { -+ continue -+ } -+ var unownedTasks []TaskID -+ err := e.db.Select(e.ctx, &unownedTasks, `SELECT id -+ FROM harmony_task -+ WHERE owner_id IS NULL AND name=$1 -+ ORDER BY update_time`, v.Name) -+ if err != nil { -+ log.Error("Unable to read work ", err) -+ continue -+ } -+ accepted := v.considerWork(unownedTasks) -+ if !accepted { -+ log.Warn("Work not accepted") -+ continue -+ } -+ if len(unownedTasks) > 1 { -+ e.bump(v.Name) // wait for others before trying again to add work. -+ goto rerun -+ } -+ } -+} -+ -+// AddHttpHandlers TODO this needs to be called by the http server to register routes. -+// This implements the receiver-side of "follows" and "bumps" the fast way. -+func (e *TaskEngine) AddHttpHandlers(root gin.IRouter) { -+ s := root.Group("/scheduler/") -+ f := s.Group("/follows") -+ for name, v := range e.follows { -+ f.GET("/"+name+"/:tID", func(c *gin.Context) { -+ tIDString := c.Param("tID") -+ tID, err := strconv.Atoi(tIDString) -+ if err != nil { -+ c.AbortWithError(401, err) -+ return -+ } -+ taskAdded := false -+ for _, v := range v { -+ taskAdded = taskAdded || v.f(TaskID(tID), v.h.AddTask) -+ } -+ if taskAdded { -+ e.tryAllWork <- true -+ c.Status(200) -+ } -+ c.Status(202) // NOTE: 202 for "accepted" but not worked. -+ }) -+ } -+ b := s.Group("/bump") -+ for _, h := range e.handlers { -+ b.GET("/"+h.Name+"/:tID", func(c *gin.Context) { -+ tIDString := c.Param("tID") -+ tID, err := strconv.Atoi(tIDString) -+ if err != nil { -+ c.AbortWithError(401, err) -+ return -+ } -+ // We NEED to block while trying to deliver -+ // this work to ease the network impact. -+ if h.considerWork([]TaskID{TaskID(tID)}) { -+ c.Status(200) -+ } -+ c.Status(202) // NOTE: 202 for "accepted" but not worked. -+ }) -+ } -+} -+ -+func (e *TaskEngine) bump(taskType string) { -+ var res []string -+ err := e.db.Select(e.ctx, &res, `SELECT host_and_port FROM harmony_machines m -+ JOIN harmony_task_impl i ON i.owner_id=m.id -+ WHERE i.name=$1`, taskType) -+ if err != nil { -+ log.Error("Could not read db for bump: ", err) -+ return -+ } -+ for _, url := range res { -+ resp, err := hClient.Get(url + "/scheduler/bump/" + taskType) -+ if err != nil { -+ log.Info("Server unreachable to bump: ", err) -+ continue -+ } -+ if resp.StatusCode == 200 { -+ return // just want 1 taker. -+ } -+ } -+} -+ -+// resourcesInUse requires workListsMutex to be already locked. -+func (e *TaskEngine) resourcesInUse() resources.Resources { -+ tmp := e.reg.Resources -+ for _, t := range e.handlers { -+ ct := t.Count.Load() -+ tmp.Cpu -= int(ct) * t.Cost.Cpu -+ tmp.Gpu -= float64(ct) * t.Cost.Gpu -+ tmp.Ram -= uint64(ct) * t.Cost.Ram -+ } -+ return tmp -+} -diff --git a/lib/harmony/harmonytask/notifyingMx.go b/lib/harmony/harmonytask/notifyingMx.go -new file mode 100644 -index 000000000..51c4e0a53 ---- /dev/null -+++ b/lib/harmony/harmonytask/notifyingMx.go -@@ -0,0 +1,16 @@ -+package harmonytask -+ -+import "sync" -+ -+type notifyingMx struct { -+ sync.Mutex -+ UnlockNotify func() -+} -+ -+func (n *notifyingMx) Unlock() { -+ tmp := n.UnlockNotify -+ n.Mutex.Unlock() -+ if tmp != nil { -+ tmp() -+ } -+} -diff --git a/lib/harmony/harmonytask/taskTypeHandler.go b/lib/harmony/harmonytask/taskTypeHandler.go -new file mode 100644 -index 000000000..079f33704 ---- /dev/null -+++ b/lib/harmony/harmonytask/taskTypeHandler.go -@@ -0,0 +1,276 @@ -+package harmonytask -+ -+import ( -+ "context" -+ "errors" -+ "io" -+ "net/http" -+ "strconv" -+ "sync/atomic" -+ "time" -+ -+ "github.com/filecoin-project/lotus/lib/harmony/harmonydb" -+ logging "github.com/ipfs/go-log/v2" -+) -+ -+var log = logging.Logger("harmonytask") -+ -+type taskTypeHandler struct { -+ TaskInterface -+ TaskTypeDetails -+ TaskEngine *TaskEngine -+ Count atomic.Int32 /// locked by TaskEngine's mutex -+ -+} -+ -+func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) { -+ var tID TaskID -+ did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool { -+ // create taskID (from DB) -+ _, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time) -+ VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID) -+ if err != nil { -+ log.Error("Could not insert into harmonyTask", err) -+ return false -+ } -+ err = tx.QueryRow("SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1").Scan(&tID) -+ if err != nil { -+ log.Error("Could not select ID: ", err) -+ } -+ return extra(tID, tx) -+ }) -+ if err != nil { -+ log.Error(err) -+ } -+ if !did { -+ return -+ } -+ -+ if !h.considerWork([]TaskID{tID}) { -+ h.TaskEngine.bump(h.Name) // We can't do it. How about someone else. -+ } -+} -+ -+func (h *taskTypeHandler) considerWork(ids []TaskID) (workAccepted bool) { -+ if len(ids) == 0 { -+ return true // stop looking for takers -+ } -+ -+ // 1. Can we do any more of this task type? -+ if h.Max > -1 && int(h.Count.Load()) == h.Max { -+ log.Infow("did not accept task", "name", h.Name, "reason", "at max already") -+ return false -+ } -+ -+ h.TaskEngine.workAdderMutex.Lock() -+ defer h.TaskEngine.workAdderMutex.Unlock() -+ -+ // 2. Can we do any more work? -+ err := h.AssertMachineHasCapacity() -+ if err != nil { -+ log.Info(err) -+ return false -+ } -+ -+ // 3. What does the impl say? -+ tID, err := h.CanAccept(ids) -+ if err != nil { -+ log.Error(err) -+ return false -+ } -+ if tID == nil { -+ log.Infow("did not accept task", "task_id", ids[0], "reason", "CanAccept() refused") -+ return false -+ } -+ -+ // 4. Can we claim the work for our hostname? -+ ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) -+ if err != nil { -+ log.Error(err) -+ return false -+ } -+ if ct == 0 { -+ log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken") -+ return false -+ } -+ -+ go func() { -+ h.Count.Add(1) -+ -+ var done bool -+ var doErr error -+ workStart := time.Now() -+ -+ defer func() { -+ if r := recover(); r != nil { -+ log.Error("Recovered from a serious error "+ -+ "while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r) -+ } -+ h.Count.Add(-1) -+ -+ h.recordCompletion(*tID, workStart, done, doErr) -+ if done { -+ h.triggerCompletionListeners(*tID) -+ } -+ -+ h.TaskEngine.tryAllWork <- true // Activate tasks in this machine -+ }() -+ -+ done, doErr = h.Do(*tID, func() bool { -+ var owner int -+ // Background here because we don't want GracefulRestart to block this save. -+ err := h.TaskEngine.db.QueryRow(context.Background(), -+ `SELECT owner_id FROM harmony_task WHERE id=$1`, *tID).Scan(&owner) -+ if err != nil { -+ log.Error("Cannot determine ownership: ", err) -+ return false -+ } -+ return owner == h.TaskEngine.ownerID -+ }) -+ if doErr != nil { -+ log.Error("Do("+h.Name+", taskID="+strconv.Itoa(int(*tID))+") returned error: ", doErr) -+ } -+ }() -+ return true -+} -+ -+func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) { -+ workEnd := time.Now() -+ -+ cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool { -+ var postedTime time.Time -+ err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime) -+ if err != nil { -+ log.Error("Could not log completion: ", err) -+ return false -+ } -+ result := "unspecified error" -+ if done { -+ _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) -+ if err != nil { -+ log.Error("Could not log completion: ", err) -+ return false -+ } -+ result = "" -+ } else { -+ if doErr != nil { -+ result = "error: " + doErr.Error() -+ } -+ var deleteTask bool -+ if h.MaxFailures > 0 { -+ ct := uint(0) -+ err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history -+ WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct) -+ if err != nil { -+ log.Error("Could not read task history:", err) -+ return false -+ } -+ if ct >= h.MaxFailures { -+ deleteTask = true -+ } -+ } -+ if deleteTask { -+ _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) -+ if err != nil { -+ log.Error("Could not delete failed job: ", err) -+ return false -+ } -+ // Note: Extra Info is left laying around for later review & clean-up -+ } else { -+ tx.Exec(`UPDATE harmony_task SET owner_id=NULL WHERE id=$1`, tID) -+ if err != nil { -+ log.Error("Could not disown failed task: ", tID, err) -+ return false -+ } -+ } -+ } -+ _, err = tx.Exec(`INSERT INTO harmony_task_history -+ (task_id, name, posted, work_start, work_end, result, err) -+ VALUES ($1, $2, $3, $4, $5, $6, $7)`, tID, h.Name, postedTime, workStart, workEnd, done, result) -+ if err != nil { -+ log.Error("Could not write history: ", err) -+ return false -+ } -+ return true -+ }) -+ if err != nil { -+ log.Error("Could not record transaction: ", err) -+ return -+ } -+ if !cm { -+ log.Error("Committing the task records failed") -+ } -+} -+ -+func (h *taskTypeHandler) AssertMachineHasCapacity() error { -+ r := h.TaskEngine.resourcesInUse() -+ -+ if r.Cpu-h.Cost.Cpu < 0 { -+ return errors.New("Did not accept " + h.Name + " task: out of cpu") -+ } -+ if h.Cost.Ram > r.Ram { -+ return errors.New("Did not accept " + h.Name + " task: out of RAM") -+ } -+ if r.Gpu-h.Cost.Gpu < 0 { -+ return errors.New("Did not accept " + h.Name + " task: out of available GPU") -+ } -+ return nil -+} -+ -+var hClient = http.Client{} -+ -+func init() { -+ hClient.Timeout = 3 * time.Second -+} -+ -+// triggerCompletionListeners does in order: -+// 1. Trigger all in-process followers (b/c it's fast). -+// 2. Trigger all living processes with followers via DB -+// 3. Future followers (think partial upgrade) can read harmony_task_history -+// 3a. The Listen() handles slow follows. -+func (h *taskTypeHandler) triggerCompletionListeners(tID TaskID) { -+ // InProcess (#1 from Description) -+ inProcessDefs := h.TaskEngine.follows[h.Name] -+ inProcessFollowers := make([]string, len(inProcessDefs)) -+ for _, fs := range inProcessDefs { -+ if fs.f(tID, fs.h.AddTask) { -+ inProcessFollowers = append(inProcessFollowers, fs.h.Name) -+ } -+ } -+ -+ // Over HTTP (#2 from Description) -+ var hps []struct { -+ HostAndPort string -+ ToType string -+ } -+ err := h.TaskEngine.db.Select(h.TaskEngine.ctx, &hps, `SELECT m.host_and_port, to_type -+ FROM harmony_task_follow f JOIN harmony_machines m ON m.id=f.owner_id -+ WHERE from_type=$1 AND to_type NOT IN $2 AND f.owner_id != $3`, -+ h.Name, inProcessFollowers, h.TaskEngine.ownerID) -+ if err != nil { -+ log.Warn("Could not fast-trigger partner processes.", err) -+ return -+ } -+ hostsVisited := map[string]bool{} -+ tasksVisited := map[string]bool{} -+ for _, v := range hps { -+ if hostsVisited[v.HostAndPort] || tasksVisited[v.ToType] { -+ continue -+ } -+ resp, err := hClient.Get(v.HostAndPort + "/scheduler/follows/" + h.Name) -+ if err != nil { -+ log.Warn("Couldn't hit http endpoint: ", err) -+ continue -+ } -+ b, err := io.ReadAll(resp.Body) -+ if err != nil { -+ log.Warn("Couldn't hit http endpoint: ", err) -+ continue -+ } -+ hostsVisited[v.HostAndPort], tasksVisited[v.ToType] = true, true -+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { -+ log.Error("IO failed for fast nudge: ", string(b)) -+ continue -+ } -+ } -+} -diff --git a/lib/harmony/resources/memsys.go b/lib/harmony/resources/memsys.go -new file mode 100644 -index 000000000..1a45b5b22 ---- /dev/null -+++ b/lib/harmony/resources/memsys.go -@@ -0,0 +1,22 @@ -+//go:build darwin || freebsd || openbsd || dragonfly || netbsd -+// +build darwin freebsd openbsd dragonfly netbsd -+ -+package resources -+ -+import ( -+ "encoding/binary" -+ "syscall" -+) -+ -+func sysctlUint64(name string) (uint64, error) { -+ s, err := syscall.Sysctl(name) -+ if err != nil { -+ return 0, err -+ } -+ // hack because the string conversion above drops a \0 -+ b := []byte(s) -+ if len(b) < 8 { -+ b = append(b, 0) -+ } -+ return binary.LittleEndian.Uint64(b), nil -+} -diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go -new file mode 100644 -index 000000000..77200b873 ---- /dev/null -+++ b/lib/harmony/resources/resources.go -@@ -0,0 +1,180 @@ -+package resources -+ -+import ( -+ "bytes" -+ "context" -+ "fmt" -+ "os/exec" -+ "regexp" -+ "runtime" -+ "strings" -+ "sync/atomic" -+ "time" -+ -+ cl "github.com/Nv7-Github/go-cl" -+ ffi "github.com/filecoin-project/filecoin-ffi" -+ "github.com/filecoin-project/lotus/lib/harmony/harmonydb" -+ logging "github.com/ipfs/go-log/v2" -+ "github.com/pbnjay/memory" -+ -+ "golang.org/x/sys/unix" -+ -+ "github.com/samber/lo" -+) -+ -+var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats -+ -+type Resources struct { -+ Cpu int -+ Gpu float64 -+ GpuRam uint64 -+ Ram uint64 -+ MachineID int -+} -+type Reg struct { -+ Resources -+ shutdown atomic.Bool -+} -+ -+var logger = logging.Logger("harmonytask") -+ -+var lotusRE = regexp.MustCompile("lotus-worker|lotus-harmony|yugabyted") -+ -+func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { -+ var reg Reg -+ var err error -+ reg.Resources, err = getResources() -+ if err != nil { -+ return nil, err -+ } -+ ctx := context.Background() -+ { // Learn our owner_id while updating harmony_machines -+ var ownerID []int -+ err := db.Select(ctx, &ownerID, `SELECT id FROM harmony_machines WHERE host_and_port=$1`, hostnameAndPort) -+ if err != nil { -+ return nil, fmt.Errorf("could not read from harmony_machines: %w", err) -+ } -+ if len(ownerID) == 0 { -+ err = db.QueryRow(ctx, `INSERT INTO harmony_machines -+ (host_and_port, cpu, ram, gpu, gpuram) VALUES -+ ($1,$2,$3,$4,$5) RETURNING id`, -+ hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu, reg.GpuRam).Scan(®.Resources.MachineID) -+ if err != nil { -+ return nil, err -+ } -+ -+ } else { -+ reg.MachineID = ownerID[0] -+ _, err := db.Exec(ctx, `UPDATE harmony_machines SET -+ cpu=$1, ram=$2, gpu=$3, gpuram=$4 WHERE id=$6`, -+ reg.Cpu, reg.Ram, reg.Gpu, reg.GpuRam, reg.Resources.MachineID) -+ if err != nil { -+ return nil, err -+ } -+ } -+ CleanupMachines(context.Background(), db) -+ } -+ go func() { -+ for { -+ time.Sleep(time.Minute) -+ if reg.shutdown.Load() { -+ return -+ } -+ _, err := db.Exec(ctx, `UPDATE harmony_machines SET last_contact=CURRENT_TIMESTAMP`) -+ if err != nil { -+ logger.Error("Cannot keepalive ", err) -+ } -+ } -+ }() -+ -+ return ®, nil -+} -+func CleanupMachines(ctx context.Context, db *harmonydb.DB) int { -+ ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`, -+ time.Now().Add(-1*LOOKS_DEAD_TIMEOUT)) -+ if err != nil { -+ logger.Warn("unable to delete old machines: ", err) -+ } -+ return ct -+} -+ -+func (res *Reg) Shutdown() { -+ res.shutdown.Store(true) -+} -+ -+func getResources() (res Resources, err error) { -+ b, err := exec.Command(`ps`, `-ef`).CombinedOutput() -+ if err != nil { -+ logger.Warn("Could not safety check for 2+ processes: ", err) -+ } else { -+ found := 0 -+ for _, b := range bytes.Split(b, []byte("\n")) { -+ if lotusRE.Match(b) { -+ found++ -+ } -+ } -+ if found > 1 { -+ logger.Error("This Lotus process should run alone on a machine. Use CGroup.") -+ } -+ } -+ -+ res = Resources{ -+ Cpu: runtime.NumCPU(), -+ Ram: memory.FreeMemory(), -+ GpuRam: getGpuRam(), -+ } -+ -+ { // GPU boolean -+ gpus, err := ffi.GetGPUDevices() -+ if err != nil { -+ logger.Errorf("getting gpu devices failed: %+v", err) -+ } -+ all := strings.ToLower(strings.Join(gpus, ",")) -+ if len(gpus) > 1 || strings.Contains(all, "ati") || strings.Contains(all, "nvidia") { -+ res.Gpu = 1 -+ } -+ } -+ -+ return res, nil -+} -+ -+func getGpuRam() uint64 { -+ platforms, err := cl.GetPlatforms() -+ if err != nil { -+ logger.Error(err) -+ return 0 -+ } -+ -+ return uint64(lo.SumBy(platforms, func(p *cl.Platform) int64 { -+ d, err := p.GetDevices(cl.DeviceTypeAll) -+ if err != nil { -+ logger.Error(err) -+ return 0 -+ } -+ return lo.SumBy(d, func(d *cl.Device) int64 { return d.GlobalMemSize() }) -+ })) -+} -+ -+func DiskFree(path string) (uint64, error) { -+ s := unix.Statfs_t{} -+ err := unix.Statfs(path, &s) -+ if err != nil { -+ return 0, err -+ } -+ -+ return s.Bfree * uint64(s.Bsize), nil -+} -+ -+/* NOT for Darwin. -+func GetMemFree() uint64 { -+ in := unix.Sysinfo_t{} -+ err := unix.Sysinfo(&in) -+ if err != nil { -+ return 0 -+ } -+ // If this is a 32-bit system, then these fields are -+ // uint32 instead of uint64. -+ // So we always convert to uint64 to match signature. -+ return uint64(in.Freeram) * uint64(in.Unit) -+} -+*/