diff --git a/.circleci/config.yml b/.circleci/config.yml index 7abb9d5fc..64da19e34 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -783,6 +783,12 @@ workflows: - build suite: itest-harmonydb target: "./itests/harmonydb_test.go" + - test: + name: test-itest-harmonytask + requires: + - build + suite: itest-harmonytask + target: "./itests/harmonytask_test.go" - test: name: test-itest-lite_migration requires: diff --git a/go.mod b/go.mod index 2da784ad6..714444dcf 100644 --- a/go.mod +++ b/go.mod @@ -128,11 +128,13 @@ require ( github.com/multiformats/go-multihash v0.2.3 github.com/multiformats/go-varint v0.0.7 github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 + github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/polydawn/refmt v0.89.0 github.com/prometheus/client_golang v1.16.0 github.com/puzpuzpuz/xsync/v2 v2.4.0 github.com/raulk/clock v1.1.0 github.com/raulk/go-watchdog v1.3.0 + github.com/samber/lo v1.38.1 github.com/stretchr/testify v1.8.4 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/urfave/cli/v2 v2.25.5 @@ -156,7 +158,7 @@ require ( golang.org/x/exp v0.0.0-20230321023759-10a507213a29 golang.org/x/net v0.10.0 golang.org/x/sync v0.2.0 - golang.org/x/sys v0.9.0 + golang.org/x/sys v0.10.0 golang.org/x/term v0.9.0 golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 golang.org/x/tools v0.9.1 @@ -203,6 +205,7 @@ require ( github.com/francoispqt/gojay v1.2.13 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gdamore/encoding v1.0.0 // indirect + github.com/gin-gonic/gin v1.9.1 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logr/logr v1.2.4 // indirect @@ -290,7 +293,6 @@ require ( github.com/onsi/ginkgo/v2 v2.9.7 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect - github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -311,7 +313,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/tidwall/gjson v1.14.4 // indirect github.com/twmb/murmur3 v1.1.6 // indirect - github.com/ugorji/go/codec v1.2.6 // indirect + github.com/ugorji/go/codec v1.2.11 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.0.1 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect diff --git a/go.sum b/go.sum index ebbc4dcc8..496fe2205 100644 --- a/go.sum +++ b/go.sum @@ -142,6 +142,7 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f github.com/buger/goterm v1.0.3 h1:7V/HeAQHrzPk/U4BvyH2g9u+xbUW9nr4yRPyG59W4fM= github.com/buger/goterm v1.0.3/go.mod h1:HiFWV3xnkolgrBV3mY8m0X0Pumt4zg4QhbdOzQtB8tE= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= +github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -152,6 +153,7 @@ github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/logex v1.2.1 h1:XHDu3E6q+gdHgsdTPH6ImJMIp436vR6MPtH8gP05QzM= github.com/chzyer/logex v1.2.1/go.mod h1:JLbx6lG2kDbNRFnfkgvh4eRJRPX1QCoOIWomwysCBrQ= @@ -386,6 +388,7 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gbrlsnchs/jwt/v3 v3.0.1 h1:lbUmgAKpxnClrKloyIwpxm4OuWeDl5wLk52G91ODPw4= github.com/gbrlsnchs/jwt/v3 v3.0.1/go.mod h1:AncDcjXz18xetI3A6STfXq2w+LuTx8pQ8bGEwRN8zVM= github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdko= @@ -397,8 +400,9 @@ github.com/georgysavva/scany/v2 v2.0.0/go.mod h1:sigOdh+0qb/+aOs3TVhehVT10p8qJL7 github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= -github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= +github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/go-chi/chi v1.5.4 h1:QHdzF2szwjqVV4wmByUnTcsbIg7UGaQ0tPF2t5GcAIs= @@ -443,12 +447,12 @@ github.com/go-openapi/swag v0.19.8/go.mod h1:ao+8BpOPyKdpQz3AOJfbeEVpLmWAvlT1IfT github.com/go-openapi/swag v0.19.11 h1:RFTu/dlFySpyVvJDfp/7674JY4SDglYWKztbiIGFpmc= github.com/go-openapi/swag v0.19.11/go.mod h1:Uc0gKkdR+ojzsEpjh39QChyu92vPgIr72POcgHMAgSY= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= -github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= -github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= -github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= @@ -464,6 +468,7 @@ github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= @@ -968,8 +973,8 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= @@ -1405,7 +1410,9 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk= github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw= @@ -1512,6 +1519,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= @@ -1618,16 +1627,16 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= -github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn0= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ= -github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw= +github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= +github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= @@ -1789,6 +1798,7 @@ go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1 go4.org v0.0.0-20200411211856-f5505b9728dd/go.mod h1:CIiUVy99QCPfoE13bO4EZaz5GZMZXMSBGhxRdsvzbkg= go4.org v0.0.0-20230225012048-214862532bf5 h1:nifaUDeh+rPaBCMPMQHZmvJf+QdpLFnuQPwx+LxVmtc= go4.org v0.0.0-20230225012048-214862532bf5/go.mod h1:F57wTi5Lrj6WLyswp5EYV1ncrEbFGHD4hhz6S1ZYeaU= +golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -2064,8 +2074,8 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= -golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go new file mode 100644 index 000000000..b36e9ab11 --- /dev/null +++ b/itests/harmonytask_test.go @@ -0,0 +1,255 @@ +package itests + +import ( + "context" + "errors" + "fmt" + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/node/impl" +) + +type task1 struct { + toAdd []int + myPersonalTableLock sync.Mutex + myPersonalTable map[harmonytask.TaskID]int // This would typically be a DB table + WorkCompleted []string +} + +func withDbSetup(t *testing.T, f func(*kit.TestMiner)) { + _, miner, _ := kit.EnsembleMinimal(t, + kit.LatestActorsAt(-1), + kit.MockProofs(), + ) + + f(miner) +} + +func (t *task1) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + if !stillOwned() { + return false, errors.New("Why not still owned?") + } + t.myPersonalTableLock.Lock() + defer t.myPersonalTableLock.Unlock() + t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[tID])) + return true, nil +} +func (t *task1) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { + return &list[0], nil +} +func (t *task1) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: 100, + Name: "ThingOne", + MaxFailures: 1, + Cost: resources.Resources{ + Cpu: 1, + Ram: 100 << 10, // at 100kb, it's tiny + }, + } +} +func (t *task1) Adder(add harmonytask.AddTaskFunc) { + for _, vTmp := range t.toAdd { + v := vTmp + add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool { + t.myPersonalTableLock.Lock() + defer t.myPersonalTableLock.Unlock() + + t.myPersonalTable[tID] = v + return true + }) + } +} + +func TestHarmonyTasks(t *testing.T) { + withDbSetup(t, func(m *kit.TestMiner) { + cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB + t1 := &task1{ + toAdd: []int{56, 73}, + myPersonalTable: map[harmonytask.TaskID]int{}, + } + e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1") + require.NoError(t, err) + time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. + e.GracefullyTerminate(time.Minute) + expected := []string{"taskResult56", "taskResult73"} + sort.Strings(t1.WorkCompleted) + require.Equal(t, t1.WorkCompleted, expected, "unexpected results") + }) +} + +type passthru struct { + dtl harmonytask.TaskTypeDetails + do func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) + canAccept func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) + adder func(add harmonytask.AddTaskFunc) +} + +func (t *passthru) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + return t.do(tID, stillOwned) +} +func (t *passthru) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { + return t.canAccept(list) +} +func (t *passthru) TypeDetails() harmonytask.TaskTypeDetails { + return t.dtl +} +func (t *passthru) Adder(add harmonytask.AddTaskFunc) { + if t.adder != nil { + t.adder(add) + } +} + +// Common stuff +var dtl = harmonytask.TaskTypeDetails{Name: "foo", Max: -1, Cost: resources.Resources{}} +var lettersMutex sync.Mutex + +func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru { + return &passthru{ + dtl: dtl, + canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return nil, nil }, + adder: func(add harmonytask.AddTaskFunc) { + for _, vTmp := range []string{"A", "B"} { + v := vTmp + add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool { + _, err := tx.Exec("INSERT INTO itest_scratch (some_int, content) VALUES ($1,$2)", tID, v) + require.NoError(t, err) + return true + }) + } + }, + } +} +func fooLetterSaver(t *testing.T, cdb *harmonydb.DB, dest *[]string) *passthru { + return &passthru{ + dtl: dtl, + canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil }, + do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + var content string + err = cdb.QueryRow(context.Background(), + "SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content) + require.NoError(t, err) + lettersMutex.Lock() + defer lettersMutex.Unlock() + *dest = append(*dest, content) + return true, nil + }, + } +} + +func TestHarmonyTasksWith2PartiesPolling(t *testing.T) { + withDbSetup(t, func(m *kit.TestMiner) { + cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB + senderParty := fooLetterAdder(t, cdb) + var dest []string + workerParty := fooLetterSaver(t, cdb, &dest) + harmonytask.POLL_DURATION = time.Millisecond * 100 + sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1") + require.NoError(t, err) + worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2") + require.NoError(t, err) + time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. + sender.GracefullyTerminate(time.Second * 5) + worker.GracefullyTerminate(time.Second * 5) + sort.Strings(dest) + require.Equal(t, dest, []string{"A", "B"}) + }) +} + +func TestWorkStealing(t *testing.T) { + withDbSetup(t, func(m *kit.TestMiner) { + cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB + ctx := context.Background() + + // The dead worker will be played by a few SQL INSERTS. + _, err := cdb.Exec(ctx, `INSERT INTO harmony_machines + (id, last_contact,host_and_port, cpu, ram, gpu, gpuram) + VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1, 1000000)`) + require.ErrorIs(t, err, nil) + _, err = cdb.Exec(ctx, `INSERT INTO harmony_task + (id, name, owner_id, posted_time, added_by) + VALUES (1234, 'foo', 300, DATE '2000-01-01', 300)`) + require.ErrorIs(t, err, nil) + _, err = cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int, content) VALUES (1234, 'M')") + require.ErrorIs(t, err, nil) + + harmonytask.POLL_DURATION = time.Millisecond * 100 + harmonytask.CLEANUP_FREQUENCY = time.Millisecond * 100 + var dest []string + worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2") + require.ErrorIs(t, err, nil) + time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. + worker.GracefullyTerminate(time.Second * 5) + require.Equal(t, []string{"M"}, dest) + }) +} + +func TestTaskRetry(t *testing.T) { + withDbSetup(t, func(m *kit.TestMiner) { + cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB + senderParty := fooLetterAdder(t, cdb) + harmonytask.POLL_DURATION = time.Millisecond * 100 + sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1") + require.NoError(t, err) + + alreadyFailed := map[string]bool{} + var dest []string + fails2xPerMsg := &passthru{ + dtl: dtl, + canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil }, + do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + var content string + err = cdb.QueryRow(context.Background(), + "SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content) + require.NoError(t, err) + lettersMutex.Lock() + defer lettersMutex.Unlock() + if !alreadyFailed[content] { + alreadyFailed[content] = true + return false, errors.New("intentional 'error'") + } + dest = append(dest, content) + return true, nil + }, + } + rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2") + require.NoError(t, err) + time.Sleep(3 * time.Second) + sender.GracefullyTerminate(time.Hour) + rcv.GracefullyTerminate(time.Hour) + sort.Strings(dest) + require.Equal(t, []string{"A", "B"}, dest) + type hist struct { + TaskID int + Result bool + Err string + } + var res []hist + require.NoError(t, cdb.Select(context.Background(), &res, + `SELECT task_id, result, err FROM harmony_task_history + ORDER BY result DESC, task_id`)) + + require.Equal(t, []hist{ + {1, true, ""}, + {2, true, ""}, + {1, false, "error: intentional 'error'"}, + {2, false, "error: intentional 'error'"}}, res) + }) +} + +/* +FUTURE test fast-pass round-robin via http calls (3party) once the API for that is set +It's necessary for WinningPoSt. + +FUTURE test follows. +It's necessary for sealing work. +*/ diff --git a/lib/harmony/harmonydb/harmonydb.go b/lib/harmony/harmonydb/harmonydb.go index fd31e7a13..48e3db6fa 100644 --- a/lib/harmony/harmonydb/harmonydb.go +++ b/lib/harmony/harmonydb/harmonydb.go @@ -118,21 +118,25 @@ type tracer struct { type ctxkey string -var sqlStart = ctxkey("sqlStart") +const SQL_START = ctxkey("sqlStart") +const SQL_STRING = ctxkey("sqlString") func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context { - return context.WithValue(ctx, sqlStart, time.Now()) + return context.WithValue(context.WithValue(ctx, SQL_START, time.Now()), SQL_STRING, data.SQL) } func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) { DBMeasures.Hits.M(1) - ms := time.Since(ctx.Value(sqlStart).(time.Time)).Milliseconds() + ms := time.Since(ctx.Value(SQL_START).(time.Time)).Milliseconds() DBMeasures.TotalWait.M(ms) DBMeasures.Waits.Observe(float64(ms)) if data.Err != nil { DBMeasures.Errors.M(1) } - // Can log what type of query it is, but not what tables - // Can log rows affected. + logger.Debugw("SQL run", + "query", ctx.Value(SQL_STRING).(string), + "err", data.Err, + "rowCt", data.CommandTag.RowsAffected(), + "milliseconds", ms) } // addStatsAndConnect connects a prometheus logger. Be sure to run this before using the DB. @@ -250,8 +254,9 @@ func (db *DB) upgrade() error { } _, err = db.pgx.Exec(context.Background(), s) if err != nil { - db.log(fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error())) - return err + msg := fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error()) + db.log(msg) + return errors.New(msg) // makes devs lives easier by placing message at the end. } } diff --git a/lib/harmony/harmonydb/sql/20230706.sql b/lib/harmony/harmonydb/sql/20230706.sql index b45aca7fa..a4a333b81 100644 --- a/lib/harmony/harmonydb/sql/20230706.sql +++ b/lib/harmony/harmonydb/sql/20230706.sql @@ -2,5 +2,6 @@ CREATE TABLE itest_scratch ( id SERIAL PRIMARY KEY, content TEXT, some_int INTEGER, + second_int INTEGER, update_time TIMESTAMP DEFAULT current_timestamp ) \ No newline at end of file diff --git a/lib/harmony/harmonydb/sql/20230719.sql b/lib/harmony/harmonydb/sql/20230719.sql new file mode 100644 index 000000000..0a676526b --- /dev/null +++ b/lib/harmony/harmonydb/sql/20230719.sql @@ -0,0 +1,52 @@ +/* For HarmonyTask base implementation. */ + +CREATE TABLE harmony_machines ( + id SERIAL PRIMARY KEY NOT NULL, + last_contact TIMESTAMP NOT NULL DEFAULT current_timestamp, + host_and_port varchar(300) NOT NULL, + cpu INTEGER NOT NULL, + ram BIGINT NOT NULL, + gpu FLOAT NOT NULL, + gpuram BIGINT NOT NULL +); + +CREATE TABLE harmony_task ( + id SERIAL PRIMARY KEY NOT NULL, + initiated_by INTEGER, + update_time TIMESTAMP NOT NULL DEFAULT current_timestamp, + posted_time TIMESTAMP NOT NULL, + owner_id INTEGER REFERENCES harmony_machines (id) ON DELETE SET NULL, + added_by INTEGER NOT NULL, + previous_task INTEGER, + name varchar(8) NOT NULL +); +COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.'; +COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.'; +COMMENT ON COLUMN harmony_task.name IS 'The name of the task type.'; +COMMENT ON COLUMN harmony_task.owner_id IS 'may be null if between owners or not yet taken'; +COMMENT ON COLUMN harmony_task.update_time IS 'When it was last modified. not a heartbeat'; + +CREATE TABLE harmony_task_history ( + id SERIAL PRIMARY KEY NOT NULL, + task_id INTEGER NOT NULL, + name VARCHAR(8) NOT NULL, + posted TIMESTAMP NOT NULL, + work_start TIMESTAMP NOT NULL, + work_end TIMESTAMP NOT NULL, + result BOOLEAN NOT NULL, + err varchar +); +COMMENT ON COLUMN harmony_task_history.result IS 'Use to detemine if this was a successful run.'; + +CREATE TABLE harmony_task_follow ( + id SERIAL PRIMARY KEY NOT NULL, + owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE, + to_type VARCHAR(8) NOT NULL, + from_type VARCHAR(8) NOT NULL +); + +CREATE TABLE harmony_task_impl ( + id SERIAL PRIMARY KEY NOT NULL, + owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE, + name VARCHAR(8) NOT NULL +); \ No newline at end of file diff --git a/lib/harmony/harmonytask/doc.go b/lib/harmony/harmonytask/doc.go new file mode 100644 index 000000000..772b674cd --- /dev/null +++ b/lib/harmony/harmonytask/doc.go @@ -0,0 +1,79 @@ +/* +Package harmonytask implements a pure (no task logic), distributed +task manager. This clean interface allows a task implementer to completely +avoid being concerned with task scheduling and management. +It's based on the idea of tasks as small units of work broken from other +work by hardware, parallelizabilty, reliability, or any other reason. +Workers will be Greedy: vaccuuming up their favorite jobs from a list. +Once 1 task is accepted, harmonydb tries to get other task runner +machines to accept work (round robin) before trying again to accept. +* +Mental Model: + + Things that block tasks: + - task not registered for any running server + - max was specified and reached + - resource exhaustion + - CanAccept() interface (per-task implmentation) does not accept it. + Ways tasks start: (slowest first) + - DB Read every 1 minute + - Bump via HTTP if registered in DB + - Task was added (to db) by this process + Ways tasks get added: + - Async Listener task (for chain, etc) + - Followers: Tasks get added because another task completed + When Follower collectors run: + - If both sides are process-local, then this process will pick it up. + - If properly registered already, the http endpoint will be tried to start it. + - Otherwise, at the listen interval during db scrape it will be found. + How duplicate tasks are avoided: + - that's up to the task definition, but probably a unique key + +* +To use: +1.Implement TaskInterface for a new task. +2 Have New() receive this & all other ACTIVE implementations. +* +* +As we are not expecting DBAs in this database, it's important to know +what grows uncontrolled. The only harmony_* table is _task_history +(somewhat quickly) and harmony_machines (slowly). These will need a +clean-up for after the task data could never be acted upon. +but the design **requires** extraInfo tables to grow until the task's +info could not possibly be used by a following task, including slow +release rollout. This would normally be in the order of months old. +* +Other possible enhancements include more collaborative coordination +to assign a task to machines closer to the data. + +__Database_Behavior__ +harmony_task is the list of work that has not been completed. + + AddTaskFunc manages the additions, but is designed to have its + transactions failed-out on overlap with a similar task already written. + It's up to the TaskInterface implementer to discover this overlap via + some other table it uses (since overlap can mean very different things). + +harmony_task_history + + This holds transactions that completed or saw too many retries. It also + serves as input for subsequent (follower) tasks to kick off. This is not + done machine-internally because a follower may not be on the same machine + as the previous task. + +harmony_task_machines + + Managed by lib/harmony/resources, this is a reference to machines registered + via the resources. This registration does not obligate the machine to + anything, but serves as a discovery mechanism. Paths are hostnames + ports + which are presumed to support http, but this assumption is only used by + the task system. + +harmony_task_follow / harmony_task_impl + + These tables are used to fast-path notifications to other machines instead + of waiting for polling. _impl helps round-robin work pick-up. _follow helps + discover the machines that are interested in creating tasks following the + task that just completed. +*/ +package harmonytask diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go new file mode 100644 index 000000000..cd401f6d2 --- /dev/null +++ b/lib/harmony/harmonytask/harmonytask.go @@ -0,0 +1,408 @@ +package harmonytask + +import ( + "context" + "fmt" + "net/http" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/mux" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/resources" +) + +// Consts (except for unit test) +var POLL_DURATION = time.Minute // Poll for Work this frequently +var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone + +type TaskTypeDetails struct { + // Max returns how many tasks this machine can run of this type. + // Negative means unrestricted. + Max int + + // Name is the task name to be added to the task list. + Name string + + // Peak costs to Do() the task. + Cost resources.Resources + + // Max Failure count before the job is dropped. + // 0 = retry forever + MaxFailures uint + + // Follow another task's completion via this task's creation. + // The function should populate extraInfo from data + // available from the previous task's tables, using the given TaskID. + // It should also return success if the trigger succeeded. + // NOTE: if refatoring tasks, see if your task is + // necessary. Ex: Is the sector state correct for your stage to run? + Follows map[string]func(TaskID, AddTaskFunc) bool +} + +// TaskInterface must be implemented in order to have a task used by harmonytask. +type TaskInterface interface { + // Do the task assigned. Call stillOwned before making single-writer-only + // changes to ensure the work has not been stolen. + // This is the ONLY function that should attempt to do the work, and must + // ONLY be called by harmonytask. + // Indicate if the task no-longer needs scheduling with done=true including + // cases where it's past the deadline. + Do(taskID TaskID, stillOwned func() bool) (done bool, err error) + + // CanAccept should return if the task can run on this machine. It should + // return null if the task type is not allowed on this machine. + // It should select the task it most wants to accomplish. + // It is also responsible for determining & reserving disk space (including scratch). + CanAccept([]TaskID) (*TaskID, error) + + // TypeDetails() returns static details about how this task behaves and + // how this machine will run it. Read once at the beginning. + TypeDetails() TaskTypeDetails + + // This listener will consume all external sources continuously for work. + // Do() may also be called from a backlog of work. This must not + // start doing the work (it still must be scheduled). + // Note: Task de-duplication should happen in ExtraInfoFunc by + // returning false, typically by determining from the tx that the work + // exists already. The easy way is to have a unique joint index + // across all fields that will be common. + // Adder should typically only add its own task type, but multiple + // is possible for when 1 trigger starts 2 things. + // Usage Example: + // func (b *BazType)Adder(addTask AddTaskFunc) { + // for { + // bazMaker := <- bazChannel + // addTask("baz", func(t harmonytask.TaskID, txn db.Transaction) bool { + // _, err := txn.Exec(`INSERT INTO bazInfoTable (taskID, qix, mot) + // VALUES ($1,$2,$3)`, id, bazMaker.qix, bazMaker.mot) + // if err != nil { + // scream(err) + // return false + // } + // return true + // }) + // } + // } + Adder(AddTaskFunc) +} + +type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) bool) + +type TaskEngine struct { + ctx context.Context + handlers []*taskTypeHandler + db *harmonydb.DB + workAdderMutex sync.Mutex + reg *resources.Reg + grace context.CancelFunc + taskMap map[string]*taskTypeHandler + ownerID int + tryAllWork chan bool // notify if work completed + follows map[string][]followStruct + lastFollowTime time.Time + lastCleanup atomic.Value +} +type followStruct struct { + f func(TaskID, AddTaskFunc) bool + h *taskTypeHandler +} + +type TaskID int + +// New creates all the task definitions. Note that TaskEngine +// knows nothing about the tasks themselves and serves to be a +// generic container for common work +func New( + db *harmonydb.DB, + impls []TaskInterface, + hostnameAndPort string) (*TaskEngine, error) { + + reg, err := resources.Register(db, hostnameAndPort) + if err != nil { + return nil, fmt.Errorf("cannot get resources: %w", err) + } + ctx, grace := context.WithCancel(context.Background()) + e := &TaskEngine{ + ctx: ctx, + grace: grace, + db: db, + reg: reg, + ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort" + taskMap: make(map[string]*taskTypeHandler, len(impls)), + tryAllWork: make(chan bool), + follows: make(map[string][]followStruct), + } + e.lastCleanup.Store(time.Now()) + for _, c := range impls { + h := taskTypeHandler{ + TaskInterface: c, + TaskTypeDetails: c.TypeDetails(), + TaskEngine: e, + } + e.handlers = append(e.handlers, &h) + e.taskMap[h.TaskTypeDetails.Name] = &h + + _, err := db.Exec(e.ctx, `INSERT INTO harmony_task_impl (owner_id, name) + VALUES ($1,$2)`, e.ownerID, h.Name) + if err != nil { + return nil, fmt.Errorf("can't update impl: %w", err) + } + + for name, fn := range c.TypeDetails().Follows { + e.follows[name] = append(e.follows[name], followStruct{fn, &h}) + + // populate harmony_task_follows + _, err := db.Exec(e.ctx, `INSERT INTO harmony_task_follows (owner_id, from_task, to_task) + VALUES ($1,$2,$3)`, e.ownerID, name, h.Name) + if err != nil { + return nil, fmt.Errorf("can't update harmony_task_follows: %w", err) + } + } + } + + // resurrect old work + { + var taskRet []struct { + ID int + Name string + } + + err := db.Select(e.ctx, &taskRet, `SELECT id, name from harmony_task WHERE owner_id=$1`, e.ownerID) + if err != nil { + return nil, err + } + for _, w := range taskRet { + // edge-case: if old assignments are not available tasks, unlock them. + h := e.taskMap[w.Name] + if h == nil { + _, err := db.Exec(e.ctx, `UPDATE harmony_task SET owner=NULL WHERE id=$1`, w.ID) + if err != nil { + log.Errorw("Cannot remove self from owner field", "error", err) + continue // not really fatal, but not great + } + } + if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) { + log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name) + } + } + } + for _, h := range e.handlers { + go h.Adder(h.AddTask) + } + go e.poller() + + return e, nil +} + +// GracefullyTerminate hangs until all present tasks have completed. +// Call this to cleanly exit the process. As some processes are long-running, +// passing a deadline will ignore those still running (to be picked-up later). +func (e *TaskEngine) GracefullyTerminate(deadline time.Duration) { + e.grace() + e.reg.Shutdown() + deadlineChan := time.NewTimer(deadline).C + + ctx := context.TODO() + + // block bumps & follows by unreg from DBs. + _, err := e.db.Exec(ctx, `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID) + if err != nil { + log.Warn("Could not clean-up impl table: %w", err) + } + _, err = e.db.Exec(ctx, `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID) + if err != nil { + log.Warn("Could not clean-up impl table: %w", err) + } +top: + for _, h := range e.handlers { + if h.Count.Load() > 0 { + select { + case <-deadlineChan: + return + default: + time.Sleep(time.Millisecond) + goto top + } + } + } +} + +func (e *TaskEngine) poller() { + for { + select { + case <-e.tryAllWork: ///////////////////// Find work after some work finished + case <-time.NewTicker(POLL_DURATION).C: // Find work periodically + case <-e.ctx.Done(): ///////////////////// Graceful exit + return + } + e.followWorkInDB() // "Follows" the slow way + e.pollerTryAllWork() // "Bumps" (round robin tasks) the slow way + } +} + +// followWorkInDB implements "Follows" the slow way +func (e *TaskEngine) followWorkInDB() { + // Step 1: What are we following? + var lastFollowTime time.Time + lastFollowTime, e.lastFollowTime = e.lastFollowTime, time.Now() + + for fromName, srcs := range e.follows { + var cList []int // Which work is done (that we follow) since we last checked? + err := e.db.Select(e.ctx, &cList, `SELECT h.task_id FROM harmony_task_history + WHERE h.work_end>$1 AND h.name=$2`, lastFollowTime, fromName) + if err != nil { + log.Error("Could not query DB: ", err) + return + } + for _, src := range srcs { + for _, workAlreadyDone := range cList { // Were any tasks made to follow these tasks? + var ct int + err := e.db.QueryRow(e.ctx, `SELECT COUNT(*) FROM harmony_task + WHERE name=$1 AND previous_task=$2`, src.h.Name, workAlreadyDone).Scan(&ct) + if err != nil { + log.Error("Could not query harmony_task: ", err) + return // not recoverable here + } + if ct > 0 { + continue + } + // we need to create this task + if !src.h.Follows[fromName](TaskID(workAlreadyDone), src.h.AddTask) { + // But someone may have beaten us to it. + log.Debugf("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, fromName) + } + } + } + } +} + +// pollerTryAllWork implements "Bumps" (next task) the slow way +func (e *TaskEngine) pollerTryAllWork() { + if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY { + e.lastCleanup.Store(time.Now()) + resources.CleanupMachines(e.ctx, e.db) + } + for _, v := range e.handlers { + rerun: + if v.AssertMachineHasCapacity() != nil { + continue + } + var unownedTasks []TaskID + err := e.db.Select(e.ctx, &unownedTasks, `SELECT id + FROM harmony_task + WHERE owner_id IS NULL AND name=$1 + ORDER BY update_time`, v.Name) + if err != nil { + log.Error("Unable to read work ", err) + continue + } + accepted := v.considerWork("poller", unownedTasks) + if !accepted { + log.Warn("Work not accepted") + continue + } + if len(unownedTasks) > 1 { + e.bump(v.Name) // wait for others before trying again to add work. + goto rerun + } + } +} + +// GetHttpHandlers needs to be used by the http server to register routes. +// This implements the receiver-side of "follows" and "bumps" the fast way. +func (e *TaskEngine) GetHttpHandlers() http.Handler { + root := mux.NewRouter() + s := root.PathPrefix("/scheduler") + f := s.PathPrefix("/follows") + b := s.PathPrefix("/bump") + for name, vsTmp := range e.follows { + vs := vsTmp + f.Path("/" + name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tIDString := mux.Vars(r)["tID"] + tID, err := strconv.Atoi(tIDString) + if err != nil { + w.WriteHeader(401) + fmt.Fprint(w, err.Error()) + return + } + taskAdded := false + for _, vTmp := range vs { + v := vTmp + taskAdded = taskAdded || v.f(TaskID(tID), v.h.AddTask) + } + if taskAdded { + e.tryAllWork <- true + w.WriteHeader(200) + return + } + w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked. + }) + } + for _, hTmp := range e.handlers { + h := hTmp + b.Path("/" + h.Name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tIDString := mux.Vars(r)["tID"] + tID, err := strconv.Atoi(tIDString) + if err != nil { + w.WriteHeader(401) + fmt.Fprint(w, err.Error()) + return + } + // We NEED to block while trying to deliver + // this work to ease the network impact. + if h.considerWork("bump", []TaskID{TaskID(tID)}) { + w.WriteHeader(200) + return + } + w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked. + }) + } + return root +} + +func (e *TaskEngine) bump(taskType string) { + var res []string + err := e.db.Select(e.ctx, &res, `SELECT host_and_port FROM harmony_machines m + JOIN harmony_task_impl i ON i.owner_id=m.id + WHERE i.name=$1`, taskType) + if err != nil { + log.Error("Could not read db for bump: ", err) + return + } + for _, url := range res { + resp, err := hClient.Get(url + "/scheduler/bump/" + taskType) + if err != nil { + log.Info("Server unreachable to bump: ", err) + continue + } + if resp.StatusCode == 200 { + return // just want 1 taker. + } + } +} + +// resourcesInUse requires workListsMutex to be already locked. +func (e *TaskEngine) resourcesInUse() resources.Resources { + tmp := e.reg.Resources + copy(tmp.GpuRam, e.reg.Resources.GpuRam) + for _, t := range e.handlers { + ct := t.Count.Load() + tmp.Cpu -= int(ct) * t.Cost.Cpu + tmp.Gpu -= float64(ct) * t.Cost.Gpu + tmp.Ram -= uint64(ct) * t.Cost.Ram + for i := int32(0); i < ct; i++ { + for grIdx, j := range tmp.GpuRam { + if j > t.Cost.GpuRam[0] { + tmp.GpuRam[grIdx] = j - t.Cost.GpuRam[0] + break + } + } + log.Warn("We should never get out of gpuram for what's consumed.") + } + } + return tmp +} diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go new file mode 100644 index 000000000..932cfc297 --- /dev/null +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -0,0 +1,292 @@ +package harmonytask + +import ( + "context" + "errors" + "io" + "net/http" + "strconv" + "sync/atomic" + "time" + + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" +) + +var log = logging.Logger("harmonytask") + +type taskTypeHandler struct { + TaskInterface + TaskTypeDetails + TaskEngine *TaskEngine + Count atomic.Int32 +} + +func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) { + var tID TaskID + did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool { + // create taskID (from DB) + _, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time) + VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID) + if err != nil { + log.Error("Could not insert into harmonyTask", err) + return false + } + err = tx.QueryRow("SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1").Scan(&tID) + if err != nil { + log.Error("Could not select ID: ", err) + } + return extra(tID, tx) + }) + if err != nil { + log.Error(err) + } + if !did { + return + } + + if !h.considerWork("adder", []TaskID{tID}) { + h.TaskEngine.bump(h.Name) // We can't do it. How about someone else. + } +} + +func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) { +top: + if len(ids) == 0 { + return true // stop looking for takers + } + + // 1. Can we do any more of this task type? + if h.Max > -1 && int(h.Count.Load()) == h.Max { + log.Infow("did not accept task", "name", h.Name, "reason", "at max already") + return false + } + + h.TaskEngine.workAdderMutex.Lock() + defer h.TaskEngine.workAdderMutex.Unlock() + + // 2. Can we do any more work? + err := h.AssertMachineHasCapacity() + if err != nil { + log.Info(err) + return false + } + + // 3. What does the impl say? + tID, err := h.CanAccept(ids) + if err != nil { + log.Error(err) + return false + } + if tID == nil { + log.Infow("did not accept task", "task_id", ids[0], "reason", "CanAccept() refused") + return false + } + + // 4. Can we claim the work for our hostname? + ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) + if err != nil { + log.Error(err) + return false + } + if ct == 0 { + log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken") + var tryAgain = make([]TaskID, 0, len(ids)-1) + for _, id := range ids { + if id != *tID { + tryAgain = append(tryAgain, id) + } + } + ids = tryAgain + goto top + } + + go func() { + h.Count.Add(1) + log.Infow("Beginning work on Task", "id", *tID, "from", from, "type", h.Name) + + var done bool + var doErr error + workStart := time.Now() + + defer func() { + if r := recover(); r != nil { + log.Error("Recovered from a serious error "+ + "while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r) + } + h.Count.Add(-1) + + h.recordCompletion(*tID, workStart, done, doErr) + if done { + h.triggerCompletionListeners(*tID) + } + + h.TaskEngine.tryAllWork <- true // Activate tasks in this machine + }() + + done, doErr = h.Do(*tID, func() bool { + var owner int + // Background here because we don't want GracefulRestart to block this save. + err := h.TaskEngine.db.QueryRow(context.Background(), + `SELECT owner_id FROM harmony_task WHERE id=$1`, *tID).Scan(&owner) + if err != nil { + log.Error("Cannot determine ownership: ", err) + return false + } + return owner == h.TaskEngine.ownerID + }) + if doErr != nil { + log.Errorw("Do() returned error", "type", h.Name, "id", strconv.Itoa(int(*tID)), "error", doErr) + } + }() + return true +} + +func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) { + workEnd := time.Now() + + cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool { + var postedTime time.Time + err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime) + if err != nil { + log.Error("Could not log completion: ", err) + return false + } + result := "unspecified error" + if done { + _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) + if err != nil { + log.Error("Could not log completion: ", err) + return false + } + result = "" + } else { + if doErr != nil { + result = "error: " + doErr.Error() + } + var deleteTask bool + if h.MaxFailures > 0 { + ct := uint(0) + err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history + WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct) + if err != nil { + log.Error("Could not read task history:", err) + return false + } + if ct >= h.MaxFailures { + deleteTask = true + } + } + if deleteTask { + _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) + if err != nil { + log.Error("Could not delete failed job: ", err) + return false + } + // Note: Extra Info is left laying around for later review & clean-up + } else { + _, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL WHERE id=$1`, tID) + if err != nil { + log.Error("Could not disown failed task: ", tID, err) + return false + } + } + } + _, err = tx.Exec(`INSERT INTO harmony_task_history + (task_id, name, posted, work_start, work_end, result, err) + VALUES ($1, $2, $3, $4, $5, $6, $7)`, tID, h.Name, postedTime, workStart, workEnd, done, result) + if err != nil { + log.Error("Could not write history: ", err) + return false + } + return true + }) + if err != nil { + log.Error("Could not record transaction: ", err) + return + } + if !cm { + log.Error("Committing the task records failed") + } +} + +func (h *taskTypeHandler) AssertMachineHasCapacity() error { + r := h.TaskEngine.resourcesInUse() + + if r.Cpu-h.Cost.Cpu < 0 { + return errors.New("Did not accept " + h.Name + " task: out of cpu") + } + if h.Cost.Ram > r.Ram { + return errors.New("Did not accept " + h.Name + " task: out of RAM") + } + if r.Gpu-h.Cost.Gpu < 0 { + return errors.New("Did not accept " + h.Name + " task: out of available GPU") + } + for _, u := range r.GpuRam { + if u > h.Cost.GpuRam[0] { + goto enoughGpuRam + } + } + return errors.New("Did not accept " + h.Name + " task: out of GPURam") +enoughGpuRam: + return nil +} + +var hClient = http.Client{} + +func init() { + hClient.Timeout = 3 * time.Second +} + +// triggerCompletionListeners does in order: +// 1. Trigger all in-process followers (b/c it's fast). +// 2. Trigger all living processes with followers via DB +// 3. Future followers (think partial upgrade) can read harmony_task_history +// 3a. The Listen() handles slow follows. +func (h *taskTypeHandler) triggerCompletionListeners(tID TaskID) { + // InProcess (#1 from Description) + inProcessDefs := h.TaskEngine.follows[h.Name] + inProcessFollowers := make([]string, len(inProcessDefs)) + for _, fs := range inProcessDefs { + if fs.f(tID, fs.h.AddTask) { + inProcessFollowers = append(inProcessFollowers, fs.h.Name) + } + } + + // Over HTTP (#2 from Description) + var hps []struct { + HostAndPort string + ToType string + } + err := h.TaskEngine.db.Select(h.TaskEngine.ctx, &hps, `SELECT m.host_and_port, to_type + FROM harmony_task_follow f JOIN harmony_machines m ON m.id=f.owner_id + WHERE from_type=$1 AND to_type NOT IN $2 AND f.owner_id != $3`, + h.Name, inProcessFollowers, h.TaskEngine.ownerID) + if err != nil { + log.Warn("Could not fast-trigger partner processes.", err) + return + } + hostsVisited := map[string]bool{} + tasksVisited := map[string]bool{} + for _, v := range hps { + if hostsVisited[v.HostAndPort] || tasksVisited[v.ToType] { + continue + } + resp, err := hClient.Get(v.HostAndPort + "/scheduler/follows/" + h.Name) + if err != nil { + log.Warn("Couldn't hit http endpoint: ", err) + continue + } + b, err := io.ReadAll(resp.Body) + if err != nil { + log.Warn("Couldn't hit http endpoint: ", err) + continue + } + hostsVisited[v.HostAndPort], tasksVisited[v.ToType] = true, true + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + log.Error("IO failed for fast nudge: ", string(b)) + continue + } + } +} diff --git a/lib/harmony/resources/memsys.go b/lib/harmony/resources/memsys.go new file mode 100644 index 000000000..1a45b5b22 --- /dev/null +++ b/lib/harmony/resources/memsys.go @@ -0,0 +1,22 @@ +//go:build darwin || freebsd || openbsd || dragonfly || netbsd +// +build darwin freebsd openbsd dragonfly netbsd + +package resources + +import ( + "encoding/binary" + "syscall" +) + +func sysctlUint64(name string) (uint64, error) { + s, err := syscall.Sysctl(name) + if err != nil { + return 0, err + } + // hack because the string conversion above drops a \0 + b := []byte(s) + if len(b) < 8 { + b = append(b, 0) + } + return binary.LittleEndian.Uint64(b), nil +} diff --git a/lib/harmony/resources/miniopencl/cl.h b/lib/harmony/resources/miniopencl/cl.h new file mode 100644 index 000000000..e90fb7692 --- /dev/null +++ b/lib/harmony/resources/miniopencl/cl.h @@ -0,0 +1,17 @@ + +#ifndef CL_H +#define CL_H + +#define CL_USE_DEPRECATED_OPENCL_1_1_APIS +#define CL_USE_DEPRECATED_OPENCL_1_2_APIS +#define CL_USE_DEPRECATED_OPENCL_2_0_APIS + +#define CL_TARGET_OPENCL_VERSION 300 + +#ifdef __APPLE__ +#include "OpenCL/opencl.h" +#else +#include "CL/opencl.h" +#endif + +#endif /* CL_H */ \ No newline at end of file diff --git a/lib/harmony/resources/miniopencl/mini_opencl.go b/lib/harmony/resources/miniopencl/mini_opencl.go new file mode 100644 index 000000000..a6bac9582 --- /dev/null +++ b/lib/harmony/resources/miniopencl/mini_opencl.go @@ -0,0 +1,89 @@ +// Package cl was borrowed from the go-opencl library which is more complex and +// doesn't compile well for our needs. +package cl + +// #include "cl.h" +import "C" +import ( + "fmt" + "unsafe" +) + +const maxPlatforms = 32 + +type Platform struct { + id C.cl_platform_id +} + +// Obtain the list of platforms available. +func GetPlatforms() ([]*Platform, error) { + var platformIds [maxPlatforms]C.cl_platform_id + var nPlatforms C.cl_uint + if err := C.clGetPlatformIDs(C.cl_uint(maxPlatforms), &platformIds[0], &nPlatforms); err != C.CL_SUCCESS { + return nil, toError(err) + } + platforms := make([]*Platform, nPlatforms) + for i := 0; i < int(nPlatforms); i++ { + platforms[i] = &Platform{id: platformIds[i]} + } + return platforms, nil +} + +const maxDeviceCount = 64 + +type DeviceType uint + +const ( + DeviceTypeAll DeviceType = C.CL_DEVICE_TYPE_ALL +) + +type Device struct { + id C.cl_device_id +} + +func (p *Platform) GetAllDevices() ([]*Device, error) { + var deviceIds [maxDeviceCount]C.cl_device_id + var numDevices C.cl_uint + var platformId C.cl_platform_id + if p != nil { + platformId = p.id + } + if err := C.clGetDeviceIDs(platformId, C.cl_device_type(DeviceTypeAll), C.cl_uint(maxDeviceCount), &deviceIds[0], &numDevices); err != C.CL_SUCCESS { + return nil, toError(err) + } + if numDevices > maxDeviceCount { + numDevices = maxDeviceCount + } + devices := make([]*Device, numDevices) + for i := 0; i < int(numDevices); i++ { + devices[i] = &Device{id: deviceIds[i]} + } + return devices, nil +} + +func toError(code C.cl_int) error { + return ErrOther(code) +} + +type ErrOther int + +func (e ErrOther) Error() string { + return fmt.Sprintf("cl: error %d", int(e)) +} + +// Size of global device memory in bytes. +func (d *Device) GlobalMemSize() int64 { + val, _ := d.getInfoUlong(C.CL_DEVICE_GLOBAL_MEM_SIZE, true) + return val +} + +func (d *Device) getInfoUlong(param C.cl_device_info, panicOnError bool) (int64, error) { + var val C.cl_ulong + if err := C.clGetDeviceInfo(d.id, param, C.size_t(unsafe.Sizeof(val)), unsafe.Pointer(&val), nil); err != C.CL_SUCCESS { + if panicOnError { + panic("Should never fail") + } + return 0, toError(err) + } + return int64(val), nil +} diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go new file mode 100644 index 000000000..115859d75 --- /dev/null +++ b/lib/harmony/resources/resources.go @@ -0,0 +1,170 @@ +package resources + +import ( + "bytes" + "context" + "fmt" + "os/exec" + "regexp" + "runtime" + "strings" + "sync/atomic" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/pbnjay/memory" + "github.com/samber/lo" + "golang.org/x/sys/unix" + + ffi "github.com/filecoin-project/filecoin-ffi" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + cl "github.com/filecoin-project/lotus/lib/harmony/resources/miniopencl" +) + +var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats + +type Resources struct { + Cpu int + Gpu float64 + GpuRam []uint64 + Ram uint64 + MachineID int +} +type Reg struct { + Resources + shutdown atomic.Bool +} + +var logger = logging.Logger("harmonytask") + +var lotusRE = regexp.MustCompile("lotus-worker|lotus-harmony|yugabyted|yb-master|yb-tserver") + +func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { + var reg Reg + var err error + reg.Resources, err = getResources() + if err != nil { + return nil, err + } + ctx := context.Background() + { // Learn our owner_id while updating harmony_machines + var ownerID []int + err := db.Select(ctx, &ownerID, `SELECT id FROM harmony_machines WHERE host_and_port=$1`, hostnameAndPort) + if err != nil { + return nil, fmt.Errorf("could not read from harmony_machines: %w", err) + } + if len(ownerID) == 0 { + err = db.QueryRow(ctx, `INSERT INTO harmony_machines + (host_and_port, cpu, ram, gpu, gpuram) VALUES + ($1,$2,$3,$4,$5) RETURNING id`, + hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu, reg.GpuRam).Scan(®.Resources.MachineID) + if err != nil { + return nil, err + } + + } else { + reg.MachineID = ownerID[0] + _, err := db.Exec(ctx, `UPDATE harmony_machines SET + cpu=$1, ram=$2, gpu=$3, gpuram=$4 WHERE id=$6`, + reg.Cpu, reg.Ram, reg.Gpu, reg.GpuRam, reg.Resources.MachineID) + if err != nil { + return nil, err + } + } + cleaned := CleanupMachines(context.Background(), db) + logger.Infow("Cleaned up machines", "count", cleaned) + } + go func() { + for { + time.Sleep(time.Minute) + if reg.shutdown.Load() { + return + } + _, err := db.Exec(ctx, `UPDATE harmony_machines SET last_contact=CURRENT_TIMESTAMP`) + if err != nil { + logger.Error("Cannot keepalive ", err) + } + } + }() + + return ®, nil +} +func CleanupMachines(ctx context.Context, db *harmonydb.DB) int { + ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`, + time.Now().Add(-1*LOOKS_DEAD_TIMEOUT)) + if err != nil { + logger.Warn("unable to delete old machines: ", err) + } + return ct +} + +func (res *Reg) Shutdown() { + res.shutdown.Store(true) +} + +func getResources() (res Resources, err error) { + b, err := exec.Command(`ps`, `-ef`).CombinedOutput() + if err != nil { + logger.Warn("Could not safety check for 2+ processes: ", err) + } else { + found := 0 + for _, b := range bytes.Split(b, []byte("\n")) { + if lotusRE.Match(b) { + found++ + } + } + if found > 1 { + logger.Warn("lotus-provider's defaults are for running alone. Use task maximums or CGroups.") + } + } + + res = Resources{ + Cpu: runtime.NumCPU(), + Ram: memory.FreeMemory(), + GpuRam: getGpuRam(), + } + + { // GPU boolean + gpus, err := ffi.GetGPUDevices() + if err != nil { + logger.Errorf("getting gpu devices failed: %+v", err) + } + all := strings.ToLower(strings.Join(gpus, ",")) + if len(gpus) > 1 || strings.Contains(all, "ati") || strings.Contains(all, "nvidia") { + res.Gpu = 1 + } + } + + return res, nil +} + +func getGpuRam() (res []uint64) { + platforms, err := cl.GetPlatforms() + if err != nil { + logger.Error(err) + return res + } + + lo.ForEach(platforms, func(p *cl.Platform, i int) { + d, err := p.GetAllDevices() + if err != nil { + logger.Error(err) + return + } + lo.ForEach(d, func(d *cl.Device, i int) { + res = append(res, uint64(d.GlobalMemSize())) + }) + }) + return res +} + +func DiskFree(path string) (uint64, error) { + s := unix.Statfs_t{} + err := unix.Statfs(path, &s) + if err != nil { + return 0, err + } + + return s.Bfree * uint64(s.Bsize), nil +}