Merge pull request #11165 from filecoin-project/feat/harmonytask

feat:miner:harmonytask
This commit is contained in:
Andrew Jackson (Ajax) 2023-08-25 18:15:43 -05:00 committed by GitHub
commit f2a90aecef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1428 additions and 20 deletions

View File

@ -783,6 +783,12 @@ workflows:
- build
suite: itest-harmonydb
target: "./itests/harmonydb_test.go"
- test:
name: test-itest-harmonytask
requires:
- build
suite: itest-harmonytask
target: "./itests/harmonytask_test.go"
- test:
name: test-itest-lite_migration
requires:

8
go.mod
View File

@ -128,11 +128,13 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-varint v0.0.7
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/polydawn/refmt v0.89.0
github.com/prometheus/client_golang v1.16.0
github.com/puzpuzpuz/xsync/v2 v2.4.0
github.com/raulk/clock v1.1.0
github.com/raulk/go-watchdog v1.3.0
github.com/samber/lo v1.38.1
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/urfave/cli/v2 v2.25.5
@ -156,7 +158,7 @@ require (
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/net v0.10.0
golang.org/x/sync v0.2.0
golang.org/x/sys v0.9.0
golang.org/x/sys v0.10.0
golang.org/x/term v0.9.0
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9
golang.org/x/tools v0.9.1
@ -203,6 +205,7 @@ require (
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gdamore/encoding v1.0.0 // indirect
github.com/gin-gonic/gin v1.9.1 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
@ -290,7 +293,6 @@ require (
github.com/onsi/ginkgo/v2 v2.9.7 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@ -311,7 +313,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/twmb/murmur3 v1.1.6 // indirect
github.com/ugorji/go/codec v1.2.6 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.0.1 // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect

30
go.sum
View File

@ -142,6 +142,7 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f
github.com/buger/goterm v1.0.3 h1:7V/HeAQHrzPk/U4BvyH2g9u+xbUW9nr4yRPyG59W4fM=
github.com/buger/goterm v1.0.3/go.mod h1:HiFWV3xnkolgrBV3mY8m0X0Pumt4zg4QhbdOzQtB8tE=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -152,6 +153,7 @@ github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/logex v1.2.1 h1:XHDu3E6q+gdHgsdTPH6ImJMIp436vR6MPtH8gP05QzM=
github.com/chzyer/logex v1.2.1/go.mod h1:JLbx6lG2kDbNRFnfkgvh4eRJRPX1QCoOIWomwysCBrQ=
@ -386,6 +388,7 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gbrlsnchs/jwt/v3 v3.0.1 h1:lbUmgAKpxnClrKloyIwpxm4OuWeDl5wLk52G91ODPw4=
github.com/gbrlsnchs/jwt/v3 v3.0.1/go.mod h1:AncDcjXz18xetI3A6STfXq2w+LuTx8pQ8bGEwRN8zVM=
github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdko=
@ -397,8 +400,9 @@ github.com/georgysavva/scany/v2 v2.0.0/go.mod h1:sigOdh+0qb/+aOs3TVhehVT10p8qJL7
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/go-chi/chi v1.5.4 h1:QHdzF2szwjqVV4wmByUnTcsbIg7UGaQ0tPF2t5GcAIs=
@ -443,12 +447,12 @@ github.com/go-openapi/swag v0.19.8/go.mod h1:ao+8BpOPyKdpQz3AOJfbeEVpLmWAvlT1IfT
github.com/go-openapi/swag v0.19.11 h1:RFTu/dlFySpyVvJDfp/7674JY4SDglYWKztbiIGFpmc=
github.com/go-openapi/swag v0.19.11/go.mod h1:Uc0gKkdR+ojzsEpjh39QChyu92vPgIr72POcgHMAgSY=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
@ -464,6 +468,7 @@ github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
@ -968,8 +973,8 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E=
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E=
@ -1405,7 +1410,9 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk=
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw=
@ -1512,6 +1519,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
@ -1618,16 +1627,16 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn0=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ=
github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
@ -1789,6 +1798,7 @@ go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1
go4.org v0.0.0-20200411211856-f5505b9728dd/go.mod h1:CIiUVy99QCPfoE13bO4EZaz5GZMZXMSBGhxRdsvzbkg=
go4.org v0.0.0-20230225012048-214862532bf5 h1:nifaUDeh+rPaBCMPMQHZmvJf+QdpLFnuQPwx+LxVmtc=
go4.org v0.0.0-20230225012048-214862532bf5/go.mod h1:F57wTi5Lrj6WLyswp5EYV1ncrEbFGHD4hhz6S1ZYeaU=
golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@ -2064,8 +2074,8 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

255
itests/harmonytask_test.go Normal file
View File

@ -0,0 +1,255 @@
package itests
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/node/impl"
)
type task1 struct {
toAdd []int
myPersonalTableLock sync.Mutex
myPersonalTable map[harmonytask.TaskID]int // This would typically be a DB table
WorkCompleted []string
}
func withDbSetup(t *testing.T, f func(*kit.TestMiner)) {
_, miner, _ := kit.EnsembleMinimal(t,
kit.LatestActorsAt(-1),
kit.MockProofs(),
)
f(miner)
}
func (t *task1) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
if !stillOwned() {
return false, errors.New("Why not still owned?")
}
t.myPersonalTableLock.Lock()
defer t.myPersonalTableLock.Unlock()
t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[tID]))
return true, nil
}
func (t *task1) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) {
return &list[0], nil
}
func (t *task1) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 100,
Name: "ThingOne",
MaxFailures: 1,
Cost: resources.Resources{
Cpu: 1,
Ram: 100 << 10, // at 100kb, it's tiny
},
}
}
func (t *task1) Adder(add harmonytask.AddTaskFunc) {
for _, vTmp := range t.toAdd {
v := vTmp
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool {
t.myPersonalTableLock.Lock()
defer t.myPersonalTableLock.Unlock()
t.myPersonalTable[tID] = v
return true
})
}
}
func TestHarmonyTasks(t *testing.T) {
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
t1 := &task1{
toAdd: []int{56, 73},
myPersonalTable: map[harmonytask.TaskID]int{},
}
e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1")
require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
e.GracefullyTerminate(time.Minute)
expected := []string{"taskResult56", "taskResult73"}
sort.Strings(t1.WorkCompleted)
require.Equal(t, t1.WorkCompleted, expected, "unexpected results")
})
}
type passthru struct {
dtl harmonytask.TaskTypeDetails
do func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error)
canAccept func(list []harmonytask.TaskID) (*harmonytask.TaskID, error)
adder func(add harmonytask.AddTaskFunc)
}
func (t *passthru) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
return t.do(tID, stillOwned)
}
func (t *passthru) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) {
return t.canAccept(list)
}
func (t *passthru) TypeDetails() harmonytask.TaskTypeDetails {
return t.dtl
}
func (t *passthru) Adder(add harmonytask.AddTaskFunc) {
if t.adder != nil {
t.adder(add)
}
}
// Common stuff
var dtl = harmonytask.TaskTypeDetails{Name: "foo", Max: -1, Cost: resources.Resources{}}
var lettersMutex sync.Mutex
func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru {
return &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return nil, nil },
adder: func(add harmonytask.AddTaskFunc) {
for _, vTmp := range []string{"A", "B"} {
v := vTmp
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) bool {
_, err := tx.Exec("INSERT INTO itest_scratch (some_int, content) VALUES ($1,$2)", tID, v)
require.NoError(t, err)
return true
})
}
},
}
}
func fooLetterSaver(t *testing.T, cdb *harmonydb.DB, dest *[]string) *passthru {
return &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil },
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var content string
err = cdb.QueryRow(context.Background(),
"SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content)
require.NoError(t, err)
lettersMutex.Lock()
defer lettersMutex.Unlock()
*dest = append(*dest, content)
return true, nil
},
}
}
func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb)
var dest []string
workerParty := fooLetterSaver(t, cdb, &dest)
harmonytask.POLL_DURATION = time.Millisecond * 100
sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1")
require.NoError(t, err)
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2")
require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
sender.GracefullyTerminate(time.Second * 5)
worker.GracefullyTerminate(time.Second * 5)
sort.Strings(dest)
require.Equal(t, dest, []string{"A", "B"})
})
}
func TestWorkStealing(t *testing.T) {
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
ctx := context.Background()
// The dead worker will be played by a few SQL INSERTS.
_, err := cdb.Exec(ctx, `INSERT INTO harmony_machines
(id, last_contact,host_and_port, cpu, ram, gpu, gpuram)
VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1, 1000000)`)
require.ErrorIs(t, err, nil)
_, err = cdb.Exec(ctx, `INSERT INTO harmony_task
(id, name, owner_id, posted_time, added_by)
VALUES (1234, 'foo', 300, DATE '2000-01-01', 300)`)
require.ErrorIs(t, err, nil)
_, err = cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int, content) VALUES (1234, 'M')")
require.ErrorIs(t, err, nil)
harmonytask.POLL_DURATION = time.Millisecond * 100
harmonytask.CLEANUP_FREQUENCY = time.Millisecond * 100
var dest []string
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2")
require.ErrorIs(t, err, nil)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
worker.GracefullyTerminate(time.Second * 5)
require.Equal(t, []string{"M"}, dest)
})
}
func TestTaskRetry(t *testing.T) {
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb)
harmonytask.POLL_DURATION = time.Millisecond * 100
sender, err := harmonytask.New(cdb, []harmonytask.TaskInterface{senderParty}, "test:1")
require.NoError(t, err)
alreadyFailed := map[string]bool{}
var dest []string
fails2xPerMsg := &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil },
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var content string
err = cdb.QueryRow(context.Background(),
"SELECT content FROM itest_scratch WHERE some_int=$1", tID).Scan(&content)
require.NoError(t, err)
lettersMutex.Lock()
defer lettersMutex.Unlock()
if !alreadyFailed[content] {
alreadyFailed[content] = true
return false, errors.New("intentional 'error'")
}
dest = append(dest, content)
return true, nil
},
}
rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2")
require.NoError(t, err)
time.Sleep(3 * time.Second)
sender.GracefullyTerminate(time.Hour)
rcv.GracefullyTerminate(time.Hour)
sort.Strings(dest)
require.Equal(t, []string{"A", "B"}, dest)
type hist struct {
TaskID int
Result bool
Err string
}
var res []hist
require.NoError(t, cdb.Select(context.Background(), &res,
`SELECT task_id, result, err FROM harmony_task_history
ORDER BY result DESC, task_id`))
require.Equal(t, []hist{
{1, true, ""},
{2, true, ""},
{1, false, "error: intentional 'error'"},
{2, false, "error: intentional 'error'"}}, res)
})
}
/*
FUTURE test fast-pass round-robin via http calls (3party) once the API for that is set
It's necessary for WinningPoSt.
FUTURE test follows.
It's necessary for sealing work.
*/

View File

@ -118,21 +118,25 @@ type tracer struct {
type ctxkey string
var sqlStart = ctxkey("sqlStart")
const SQL_START = ctxkey("sqlStart")
const SQL_STRING = ctxkey("sqlString")
func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
return context.WithValue(ctx, sqlStart, time.Now())
return context.WithValue(context.WithValue(ctx, SQL_START, time.Now()), SQL_STRING, data.SQL)
}
func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
DBMeasures.Hits.M(1)
ms := time.Since(ctx.Value(sqlStart).(time.Time)).Milliseconds()
ms := time.Since(ctx.Value(SQL_START).(time.Time)).Milliseconds()
DBMeasures.TotalWait.M(ms)
DBMeasures.Waits.Observe(float64(ms))
if data.Err != nil {
DBMeasures.Errors.M(1)
}
// Can log what type of query it is, but not what tables
// Can log rows affected.
logger.Debugw("SQL run",
"query", ctx.Value(SQL_STRING).(string),
"err", data.Err,
"rowCt", data.CommandTag.RowsAffected(),
"milliseconds", ms)
}
// addStatsAndConnect connects a prometheus logger. Be sure to run this before using the DB.
@ -250,8 +254,9 @@ func (db *DB) upgrade() error {
}
_, err = db.pgx.Exec(context.Background(), s)
if err != nil {
db.log(fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error()))
return err
msg := fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error())
db.log(msg)
return errors.New(msg) // makes devs lives easier by placing message at the end.
}
}

View File

@ -2,5 +2,6 @@ CREATE TABLE itest_scratch (
id SERIAL PRIMARY KEY,
content TEXT,
some_int INTEGER,
second_int INTEGER,
update_time TIMESTAMP DEFAULT current_timestamp
)

View File

@ -0,0 +1,52 @@
/* For HarmonyTask base implementation. */
CREATE TABLE harmony_machines (
id SERIAL PRIMARY KEY NOT NULL,
last_contact TIMESTAMP NOT NULL DEFAULT current_timestamp,
host_and_port varchar(300) NOT NULL,
cpu INTEGER NOT NULL,
ram BIGINT NOT NULL,
gpu FLOAT NOT NULL,
gpuram BIGINT NOT NULL
);
CREATE TABLE harmony_task (
id SERIAL PRIMARY KEY NOT NULL,
initiated_by INTEGER,
update_time TIMESTAMP NOT NULL DEFAULT current_timestamp,
posted_time TIMESTAMP NOT NULL,
owner_id INTEGER REFERENCES harmony_machines (id) ON DELETE SET NULL,
added_by INTEGER NOT NULL,
previous_task INTEGER,
name varchar(8) NOT NULL
);
COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.';
COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.';
COMMENT ON COLUMN harmony_task.name IS 'The name of the task type.';
COMMENT ON COLUMN harmony_task.owner_id IS 'may be null if between owners or not yet taken';
COMMENT ON COLUMN harmony_task.update_time IS 'When it was last modified. not a heartbeat';
CREATE TABLE harmony_task_history (
id SERIAL PRIMARY KEY NOT NULL,
task_id INTEGER NOT NULL,
name VARCHAR(8) NOT NULL,
posted TIMESTAMP NOT NULL,
work_start TIMESTAMP NOT NULL,
work_end TIMESTAMP NOT NULL,
result BOOLEAN NOT NULL,
err varchar
);
COMMENT ON COLUMN harmony_task_history.result IS 'Use to detemine if this was a successful run.';
CREATE TABLE harmony_task_follow (
id SERIAL PRIMARY KEY NOT NULL,
owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE,
to_type VARCHAR(8) NOT NULL,
from_type VARCHAR(8) NOT NULL
);
CREATE TABLE harmony_task_impl (
id SERIAL PRIMARY KEY NOT NULL,
owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE,
name VARCHAR(8) NOT NULL
);

View File

@ -0,0 +1,79 @@
/*
Package harmonytask implements a pure (no task logic), distributed
task manager. This clean interface allows a task implementer to completely
avoid being concerned with task scheduling and management.
It's based on the idea of tasks as small units of work broken from other
work by hardware, parallelizabilty, reliability, or any other reason.
Workers will be Greedy: vaccuuming up their favorite jobs from a list.
Once 1 task is accepted, harmonydb tries to get other task runner
machines to accept work (round robin) before trying again to accept.
*
Mental Model:
Things that block tasks:
- task not registered for any running server
- max was specified and reached
- resource exhaustion
- CanAccept() interface (per-task implmentation) does not accept it.
Ways tasks start: (slowest first)
- DB Read every 1 minute
- Bump via HTTP if registered in DB
- Task was added (to db) by this process
Ways tasks get added:
- Async Listener task (for chain, etc)
- Followers: Tasks get added because another task completed
When Follower collectors run:
- If both sides are process-local, then this process will pick it up.
- If properly registered already, the http endpoint will be tried to start it.
- Otherwise, at the listen interval during db scrape it will be found.
How duplicate tasks are avoided:
- that's up to the task definition, but probably a unique key
*
To use:
1.Implement TaskInterface for a new task.
2 Have New() receive this & all other ACTIVE implementations.
*
*
As we are not expecting DBAs in this database, it's important to know
what grows uncontrolled. The only harmony_* table is _task_history
(somewhat quickly) and harmony_machines (slowly). These will need a
clean-up for after the task data could never be acted upon.
but the design **requires** extraInfo tables to grow until the task's
info could not possibly be used by a following task, including slow
release rollout. This would normally be in the order of months old.
*
Other possible enhancements include more collaborative coordination
to assign a task to machines closer to the data.
__Database_Behavior__
harmony_task is the list of work that has not been completed.
AddTaskFunc manages the additions, but is designed to have its
transactions failed-out on overlap with a similar task already written.
It's up to the TaskInterface implementer to discover this overlap via
some other table it uses (since overlap can mean very different things).
harmony_task_history
This holds transactions that completed or saw too many retries. It also
serves as input for subsequent (follower) tasks to kick off. This is not
done machine-internally because a follower may not be on the same machine
as the previous task.
harmony_task_machines
Managed by lib/harmony/resources, this is a reference to machines registered
via the resources. This registration does not obligate the machine to
anything, but serves as a discovery mechanism. Paths are hostnames + ports
which are presumed to support http, but this assumption is only used by
the task system.
harmony_task_follow / harmony_task_impl
These tables are used to fast-path notifications to other machines instead
of waiting for polling. _impl helps round-robin work pick-up. _follow helps
discover the machines that are interested in creating tasks following the
task that just completed.
*/
package harmonytask

View File

@ -0,0 +1,408 @@
package harmonytask
import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/mux"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/resources"
)
// Consts (except for unit test)
var POLL_DURATION = time.Minute // Poll for Work this frequently
var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone
type TaskTypeDetails struct {
// Max returns how many tasks this machine can run of this type.
// Negative means unrestricted.
Max int
// Name is the task name to be added to the task list.
Name string
// Peak costs to Do() the task.
Cost resources.Resources
// Max Failure count before the job is dropped.
// 0 = retry forever
MaxFailures uint
// Follow another task's completion via this task's creation.
// The function should populate extraInfo from data
// available from the previous task's tables, using the given TaskID.
// It should also return success if the trigger succeeded.
// NOTE: if refatoring tasks, see if your task is
// necessary. Ex: Is the sector state correct for your stage to run?
Follows map[string]func(TaskID, AddTaskFunc) bool
}
// TaskInterface must be implemented in order to have a task used by harmonytask.
type TaskInterface interface {
// Do the task assigned. Call stillOwned before making single-writer-only
// changes to ensure the work has not been stolen.
// This is the ONLY function that should attempt to do the work, and must
// ONLY be called by harmonytask.
// Indicate if the task no-longer needs scheduling with done=true including
// cases where it's past the deadline.
Do(taskID TaskID, stillOwned func() bool) (done bool, err error)
// CanAccept should return if the task can run on this machine. It should
// return null if the task type is not allowed on this machine.
// It should select the task it most wants to accomplish.
// It is also responsible for determining & reserving disk space (including scratch).
CanAccept([]TaskID) (*TaskID, error)
// TypeDetails() returns static details about how this task behaves and
// how this machine will run it. Read once at the beginning.
TypeDetails() TaskTypeDetails
// This listener will consume all external sources continuously for work.
// Do() may also be called from a backlog of work. This must not
// start doing the work (it still must be scheduled).
// Note: Task de-duplication should happen in ExtraInfoFunc by
// returning false, typically by determining from the tx that the work
// exists already. The easy way is to have a unique joint index
// across all fields that will be common.
// Adder should typically only add its own task type, but multiple
// is possible for when 1 trigger starts 2 things.
// Usage Example:
// func (b *BazType)Adder(addTask AddTaskFunc) {
// for {
// bazMaker := <- bazChannel
// addTask("baz", func(t harmonytask.TaskID, txn db.Transaction) bool {
// _, err := txn.Exec(`INSERT INTO bazInfoTable (taskID, qix, mot)
// VALUES ($1,$2,$3)`, id, bazMaker.qix, bazMaker.mot)
// if err != nil {
// scream(err)
// return false
// }
// return true
// })
// }
// }
Adder(AddTaskFunc)
}
type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) bool)
type TaskEngine struct {
ctx context.Context
handlers []*taskTypeHandler
db *harmonydb.DB
workAdderMutex sync.Mutex
reg *resources.Reg
grace context.CancelFunc
taskMap map[string]*taskTypeHandler
ownerID int
tryAllWork chan bool // notify if work completed
follows map[string][]followStruct
lastFollowTime time.Time
lastCleanup atomic.Value
}
type followStruct struct {
f func(TaskID, AddTaskFunc) bool
h *taskTypeHandler
}
type TaskID int
// New creates all the task definitions. Note that TaskEngine
// knows nothing about the tasks themselves and serves to be a
// generic container for common work
func New(
db *harmonydb.DB,
impls []TaskInterface,
hostnameAndPort string) (*TaskEngine, error) {
reg, err := resources.Register(db, hostnameAndPort)
if err != nil {
return nil, fmt.Errorf("cannot get resources: %w", err)
}
ctx, grace := context.WithCancel(context.Background())
e := &TaskEngine{
ctx: ctx,
grace: grace,
db: db,
reg: reg,
ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort"
taskMap: make(map[string]*taskTypeHandler, len(impls)),
tryAllWork: make(chan bool),
follows: make(map[string][]followStruct),
}
e.lastCleanup.Store(time.Now())
for _, c := range impls {
h := taskTypeHandler{
TaskInterface: c,
TaskTypeDetails: c.TypeDetails(),
TaskEngine: e,
}
e.handlers = append(e.handlers, &h)
e.taskMap[h.TaskTypeDetails.Name] = &h
_, err := db.Exec(e.ctx, `INSERT INTO harmony_task_impl (owner_id, name)
VALUES ($1,$2)`, e.ownerID, h.Name)
if err != nil {
return nil, fmt.Errorf("can't update impl: %w", err)
}
for name, fn := range c.TypeDetails().Follows {
e.follows[name] = append(e.follows[name], followStruct{fn, &h})
// populate harmony_task_follows
_, err := db.Exec(e.ctx, `INSERT INTO harmony_task_follows (owner_id, from_task, to_task)
VALUES ($1,$2,$3)`, e.ownerID, name, h.Name)
if err != nil {
return nil, fmt.Errorf("can't update harmony_task_follows: %w", err)
}
}
}
// resurrect old work
{
var taskRet []struct {
ID int
Name string
}
err := db.Select(e.ctx, &taskRet, `SELECT id, name from harmony_task WHERE owner_id=$1`, e.ownerID)
if err != nil {
return nil, err
}
for _, w := range taskRet {
// edge-case: if old assignments are not available tasks, unlock them.
h := e.taskMap[w.Name]
if h == nil {
_, err := db.Exec(e.ctx, `UPDATE harmony_task SET owner=NULL WHERE id=$1`, w.ID)
if err != nil {
log.Errorw("Cannot remove self from owner field", "error", err)
continue // not really fatal, but not great
}
}
if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) {
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
}
}
}
for _, h := range e.handlers {
go h.Adder(h.AddTask)
}
go e.poller()
return e, nil
}
// GracefullyTerminate hangs until all present tasks have completed.
// Call this to cleanly exit the process. As some processes are long-running,
// passing a deadline will ignore those still running (to be picked-up later).
func (e *TaskEngine) GracefullyTerminate(deadline time.Duration) {
e.grace()
e.reg.Shutdown()
deadlineChan := time.NewTimer(deadline).C
ctx := context.TODO()
// block bumps & follows by unreg from DBs.
_, err := e.db.Exec(ctx, `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
}
_, err = e.db.Exec(ctx, `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
}
top:
for _, h := range e.handlers {
if h.Count.Load() > 0 {
select {
case <-deadlineChan:
return
default:
time.Sleep(time.Millisecond)
goto top
}
}
}
}
func (e *TaskEngine) poller() {
for {
select {
case <-e.tryAllWork: ///////////////////// Find work after some work finished
case <-time.NewTicker(POLL_DURATION).C: // Find work periodically
case <-e.ctx.Done(): ///////////////////// Graceful exit
return
}
e.followWorkInDB() // "Follows" the slow way
e.pollerTryAllWork() // "Bumps" (round robin tasks) the slow way
}
}
// followWorkInDB implements "Follows" the slow way
func (e *TaskEngine) followWorkInDB() {
// Step 1: What are we following?
var lastFollowTime time.Time
lastFollowTime, e.lastFollowTime = e.lastFollowTime, time.Now()
for fromName, srcs := range e.follows {
var cList []int // Which work is done (that we follow) since we last checked?
err := e.db.Select(e.ctx, &cList, `SELECT h.task_id FROM harmony_task_history
WHERE h.work_end>$1 AND h.name=$2`, lastFollowTime, fromName)
if err != nil {
log.Error("Could not query DB: ", err)
return
}
for _, src := range srcs {
for _, workAlreadyDone := range cList { // Were any tasks made to follow these tasks?
var ct int
err := e.db.QueryRow(e.ctx, `SELECT COUNT(*) FROM harmony_task
WHERE name=$1 AND previous_task=$2`, src.h.Name, workAlreadyDone).Scan(&ct)
if err != nil {
log.Error("Could not query harmony_task: ", err)
return // not recoverable here
}
if ct > 0 {
continue
}
// we need to create this task
if !src.h.Follows[fromName](TaskID(workAlreadyDone), src.h.AddTask) {
// But someone may have beaten us to it.
log.Debugf("Unable to add task %s following Task(%d, %s)", src.h.Name, workAlreadyDone, fromName)
}
}
}
}
}
// pollerTryAllWork implements "Bumps" (next task) the slow way
func (e *TaskEngine) pollerTryAllWork() {
if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY {
e.lastCleanup.Store(time.Now())
resources.CleanupMachines(e.ctx, e.db)
}
for _, v := range e.handlers {
rerun:
if v.AssertMachineHasCapacity() != nil {
continue
}
var unownedTasks []TaskID
err := e.db.Select(e.ctx, &unownedTasks, `SELECT id
FROM harmony_task
WHERE owner_id IS NULL AND name=$1
ORDER BY update_time`, v.Name)
if err != nil {
log.Error("Unable to read work ", err)
continue
}
accepted := v.considerWork("poller", unownedTasks)
if !accepted {
log.Warn("Work not accepted")
continue
}
if len(unownedTasks) > 1 {
e.bump(v.Name) // wait for others before trying again to add work.
goto rerun
}
}
}
// GetHttpHandlers needs to be used by the http server to register routes.
// This implements the receiver-side of "follows" and "bumps" the fast way.
func (e *TaskEngine) GetHttpHandlers() http.Handler {
root := mux.NewRouter()
s := root.PathPrefix("/scheduler")
f := s.PathPrefix("/follows")
b := s.PathPrefix("/bump")
for name, vsTmp := range e.follows {
vs := vsTmp
f.Path("/" + name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tIDString := mux.Vars(r)["tID"]
tID, err := strconv.Atoi(tIDString)
if err != nil {
w.WriteHeader(401)
fmt.Fprint(w, err.Error())
return
}
taskAdded := false
for _, vTmp := range vs {
v := vTmp
taskAdded = taskAdded || v.f(TaskID(tID), v.h.AddTask)
}
if taskAdded {
e.tryAllWork <- true
w.WriteHeader(200)
return
}
w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked.
})
}
for _, hTmp := range e.handlers {
h := hTmp
b.Path("/" + h.Name + "/{tID}").Methods("GET").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tIDString := mux.Vars(r)["tID"]
tID, err := strconv.Atoi(tIDString)
if err != nil {
w.WriteHeader(401)
fmt.Fprint(w, err.Error())
return
}
// We NEED to block while trying to deliver
// this work to ease the network impact.
if h.considerWork("bump", []TaskID{TaskID(tID)}) {
w.WriteHeader(200)
return
}
w.WriteHeader(202) // NOTE: 202 for "accepted" but not worked.
})
}
return root
}
func (e *TaskEngine) bump(taskType string) {
var res []string
err := e.db.Select(e.ctx, &res, `SELECT host_and_port FROM harmony_machines m
JOIN harmony_task_impl i ON i.owner_id=m.id
WHERE i.name=$1`, taskType)
if err != nil {
log.Error("Could not read db for bump: ", err)
return
}
for _, url := range res {
resp, err := hClient.Get(url + "/scheduler/bump/" + taskType)
if err != nil {
log.Info("Server unreachable to bump: ", err)
continue
}
if resp.StatusCode == 200 {
return // just want 1 taker.
}
}
}
// resourcesInUse requires workListsMutex to be already locked.
func (e *TaskEngine) resourcesInUse() resources.Resources {
tmp := e.reg.Resources
copy(tmp.GpuRam, e.reg.Resources.GpuRam)
for _, t := range e.handlers {
ct := t.Count.Load()
tmp.Cpu -= int(ct) * t.Cost.Cpu
tmp.Gpu -= float64(ct) * t.Cost.Gpu
tmp.Ram -= uint64(ct) * t.Cost.Ram
for i := int32(0); i < ct; i++ {
for grIdx, j := range tmp.GpuRam {
if j > t.Cost.GpuRam[0] {
tmp.GpuRam[grIdx] = j - t.Cost.GpuRam[0]
break
}
}
log.Warn("We should never get out of gpuram for what's consumed.")
}
}
return tmp
}

View File

@ -0,0 +1,292 @@
package harmonytask
import (
"context"
"errors"
"io"
"net/http"
"strconv"
"sync/atomic"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
)
var log = logging.Logger("harmonytask")
type taskTypeHandler struct {
TaskInterface
TaskTypeDetails
TaskEngine *TaskEngine
Count atomic.Int32
}
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) bool) {
var tID TaskID
did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool {
// create taskID (from DB)
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID)
if err != nil {
log.Error("Could not insert into harmonyTask", err)
return false
}
err = tx.QueryRow("SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1").Scan(&tID)
if err != nil {
log.Error("Could not select ID: ", err)
}
return extra(tID, tx)
})
if err != nil {
log.Error(err)
}
if !did {
return
}
if !h.considerWork("adder", []TaskID{tID}) {
h.TaskEngine.bump(h.Name) // We can't do it. How about someone else.
}
}
func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) {
top:
if len(ids) == 0 {
return true // stop looking for takers
}
// 1. Can we do any more of this task type?
if h.Max > -1 && int(h.Count.Load()) == h.Max {
log.Infow("did not accept task", "name", h.Name, "reason", "at max already")
return false
}
h.TaskEngine.workAdderMutex.Lock()
defer h.TaskEngine.workAdderMutex.Unlock()
// 2. Can we do any more work?
err := h.AssertMachineHasCapacity()
if err != nil {
log.Info(err)
return false
}
// 3. What does the impl say?
tID, err := h.CanAccept(ids)
if err != nil {
log.Error(err)
return false
}
if tID == nil {
log.Infow("did not accept task", "task_id", ids[0], "reason", "CanAccept() refused")
return false
}
// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
log.Error(err)
return false
}
if ct == 0 {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken")
var tryAgain = make([]TaskID, 0, len(ids)-1)
for _, id := range ids {
if id != *tID {
tryAgain = append(tryAgain, id)
}
}
ids = tryAgain
goto top
}
go func() {
h.Count.Add(1)
log.Infow("Beginning work on Task", "id", *tID, "from", from, "type", h.Name)
var done bool
var doErr error
workStart := time.Now()
defer func() {
if r := recover(); r != nil {
log.Error("Recovered from a serious error "+
"while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r)
}
h.Count.Add(-1)
h.recordCompletion(*tID, workStart, done, doErr)
if done {
h.triggerCompletionListeners(*tID)
}
h.TaskEngine.tryAllWork <- true // Activate tasks in this machine
}()
done, doErr = h.Do(*tID, func() bool {
var owner int
// Background here because we don't want GracefulRestart to block this save.
err := h.TaskEngine.db.QueryRow(context.Background(),
`SELECT owner_id FROM harmony_task WHERE id=$1`, *tID).Scan(&owner)
if err != nil {
log.Error("Cannot determine ownership: ", err)
return false
}
return owner == h.TaskEngine.ownerID
})
if doErr != nil {
log.Errorw("Do() returned error", "type", h.Name, "id", strconv.Itoa(int(*tID)), "error", doErr)
}
}()
return true
}
func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
workEnd := time.Now()
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) bool {
var postedTime time.Time
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)
if err != nil {
log.Error("Could not log completion: ", err)
return false
}
result := "unspecified error"
if done {
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {
log.Error("Could not log completion: ", err)
return false
}
result = ""
} else {
if doErr != nil {
result = "error: " + doErr.Error()
}
var deleteTask bool
if h.MaxFailures > 0 {
ct := uint(0)
err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history
WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct)
if err != nil {
log.Error("Could not read task history:", err)
return false
}
if ct >= h.MaxFailures {
deleteTask = true
}
}
if deleteTask {
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {
log.Error("Could not delete failed job: ", err)
return false
}
// Note: Extra Info is left laying around for later review & clean-up
} else {
_, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL WHERE id=$1`, tID)
if err != nil {
log.Error("Could not disown failed task: ", tID, err)
return false
}
}
}
_, err = tx.Exec(`INSERT INTO harmony_task_history
(task_id, name, posted, work_start, work_end, result, err)
VALUES ($1, $2, $3, $4, $5, $6, $7)`, tID, h.Name, postedTime, workStart, workEnd, done, result)
if err != nil {
log.Error("Could not write history: ", err)
return false
}
return true
})
if err != nil {
log.Error("Could not record transaction: ", err)
return
}
if !cm {
log.Error("Committing the task records failed")
}
}
func (h *taskTypeHandler) AssertMachineHasCapacity() error {
r := h.TaskEngine.resourcesInUse()
if r.Cpu-h.Cost.Cpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of cpu")
}
if h.Cost.Ram > r.Ram {
return errors.New("Did not accept " + h.Name + " task: out of RAM")
}
if r.Gpu-h.Cost.Gpu < 0 {
return errors.New("Did not accept " + h.Name + " task: out of available GPU")
}
for _, u := range r.GpuRam {
if u > h.Cost.GpuRam[0] {
goto enoughGpuRam
}
}
return errors.New("Did not accept " + h.Name + " task: out of GPURam")
enoughGpuRam:
return nil
}
var hClient = http.Client{}
func init() {
hClient.Timeout = 3 * time.Second
}
// triggerCompletionListeners does in order:
// 1. Trigger all in-process followers (b/c it's fast).
// 2. Trigger all living processes with followers via DB
// 3. Future followers (think partial upgrade) can read harmony_task_history
// 3a. The Listen() handles slow follows.
func (h *taskTypeHandler) triggerCompletionListeners(tID TaskID) {
// InProcess (#1 from Description)
inProcessDefs := h.TaskEngine.follows[h.Name]
inProcessFollowers := make([]string, len(inProcessDefs))
for _, fs := range inProcessDefs {
if fs.f(tID, fs.h.AddTask) {
inProcessFollowers = append(inProcessFollowers, fs.h.Name)
}
}
// Over HTTP (#2 from Description)
var hps []struct {
HostAndPort string
ToType string
}
err := h.TaskEngine.db.Select(h.TaskEngine.ctx, &hps, `SELECT m.host_and_port, to_type
FROM harmony_task_follow f JOIN harmony_machines m ON m.id=f.owner_id
WHERE from_type=$1 AND to_type NOT IN $2 AND f.owner_id != $3`,
h.Name, inProcessFollowers, h.TaskEngine.ownerID)
if err != nil {
log.Warn("Could not fast-trigger partner processes.", err)
return
}
hostsVisited := map[string]bool{}
tasksVisited := map[string]bool{}
for _, v := range hps {
if hostsVisited[v.HostAndPort] || tasksVisited[v.ToType] {
continue
}
resp, err := hClient.Get(v.HostAndPort + "/scheduler/follows/" + h.Name)
if err != nil {
log.Warn("Couldn't hit http endpoint: ", err)
continue
}
b, err := io.ReadAll(resp.Body)
if err != nil {
log.Warn("Couldn't hit http endpoint: ", err)
continue
}
hostsVisited[v.HostAndPort], tasksVisited[v.ToType] = true, true
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
log.Error("IO failed for fast nudge: ", string(b))
continue
}
}
}

View File

@ -0,0 +1,22 @@
//go:build darwin || freebsd || openbsd || dragonfly || netbsd
// +build darwin freebsd openbsd dragonfly netbsd
package resources
import (
"encoding/binary"
"syscall"
)
func sysctlUint64(name string) (uint64, error) {
s, err := syscall.Sysctl(name)
if err != nil {
return 0, err
}
// hack because the string conversion above drops a \0
b := []byte(s)
if len(b) < 8 {
b = append(b, 0)
}
return binary.LittleEndian.Uint64(b), nil
}

View File

@ -0,0 +1,17 @@
#ifndef CL_H
#define CL_H
#define CL_USE_DEPRECATED_OPENCL_1_1_APIS
#define CL_USE_DEPRECATED_OPENCL_1_2_APIS
#define CL_USE_DEPRECATED_OPENCL_2_0_APIS
#define CL_TARGET_OPENCL_VERSION 300
#ifdef __APPLE__
#include "OpenCL/opencl.h"
#else
#include "CL/opencl.h"
#endif
#endif /* CL_H */

View File

@ -0,0 +1,89 @@
// Package cl was borrowed from the go-opencl library which is more complex and
// doesn't compile well for our needs.
package cl
// #include "cl.h"
import "C"
import (
"fmt"
"unsafe"
)
const maxPlatforms = 32
type Platform struct {
id C.cl_platform_id
}
// Obtain the list of platforms available.
func GetPlatforms() ([]*Platform, error) {
var platformIds [maxPlatforms]C.cl_platform_id
var nPlatforms C.cl_uint
if err := C.clGetPlatformIDs(C.cl_uint(maxPlatforms), &platformIds[0], &nPlatforms); err != C.CL_SUCCESS {
return nil, toError(err)
}
platforms := make([]*Platform, nPlatforms)
for i := 0; i < int(nPlatforms); i++ {
platforms[i] = &Platform{id: platformIds[i]}
}
return platforms, nil
}
const maxDeviceCount = 64
type DeviceType uint
const (
DeviceTypeAll DeviceType = C.CL_DEVICE_TYPE_ALL
)
type Device struct {
id C.cl_device_id
}
func (p *Platform) GetAllDevices() ([]*Device, error) {
var deviceIds [maxDeviceCount]C.cl_device_id
var numDevices C.cl_uint
var platformId C.cl_platform_id
if p != nil {
platformId = p.id
}
if err := C.clGetDeviceIDs(platformId, C.cl_device_type(DeviceTypeAll), C.cl_uint(maxDeviceCount), &deviceIds[0], &numDevices); err != C.CL_SUCCESS {
return nil, toError(err)
}
if numDevices > maxDeviceCount {
numDevices = maxDeviceCount
}
devices := make([]*Device, numDevices)
for i := 0; i < int(numDevices); i++ {
devices[i] = &Device{id: deviceIds[i]}
}
return devices, nil
}
func toError(code C.cl_int) error {
return ErrOther(code)
}
type ErrOther int
func (e ErrOther) Error() string {
return fmt.Sprintf("cl: error %d", int(e))
}
// Size of global device memory in bytes.
func (d *Device) GlobalMemSize() int64 {
val, _ := d.getInfoUlong(C.CL_DEVICE_GLOBAL_MEM_SIZE, true)
return val
}
func (d *Device) getInfoUlong(param C.cl_device_info, panicOnError bool) (int64, error) {
var val C.cl_ulong
if err := C.clGetDeviceInfo(d.id, param, C.size_t(unsafe.Sizeof(val)), unsafe.Pointer(&val), nil); err != C.CL_SUCCESS {
if panicOnError {
panic("Should never fail")
}
return 0, toError(err)
}
return int64(val), nil
}

View File

@ -0,0 +1,170 @@
package resources
import (
"bytes"
"context"
"fmt"
"os/exec"
"regexp"
"runtime"
"strings"
"sync/atomic"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/pbnjay/memory"
"github.com/samber/lo"
"golang.org/x/sys/unix"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
cl "github.com/filecoin-project/lotus/lib/harmony/resources/miniopencl"
)
var LOOKS_DEAD_TIMEOUT = 10 * time.Minute // Time w/o minute heartbeats
type Resources struct {
Cpu int
Gpu float64
GpuRam []uint64
Ram uint64
MachineID int
}
type Reg struct {
Resources
shutdown atomic.Bool
}
var logger = logging.Logger("harmonytask")
var lotusRE = regexp.MustCompile("lotus-worker|lotus-harmony|yugabyted|yb-master|yb-tserver")
func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
var reg Reg
var err error
reg.Resources, err = getResources()
if err != nil {
return nil, err
}
ctx := context.Background()
{ // Learn our owner_id while updating harmony_machines
var ownerID []int
err := db.Select(ctx, &ownerID, `SELECT id FROM harmony_machines WHERE host_and_port=$1`, hostnameAndPort)
if err != nil {
return nil, fmt.Errorf("could not read from harmony_machines: %w", err)
}
if len(ownerID) == 0 {
err = db.QueryRow(ctx, `INSERT INTO harmony_machines
(host_and_port, cpu, ram, gpu, gpuram) VALUES
($1,$2,$3,$4,$5) RETURNING id`,
hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu, reg.GpuRam).Scan(&reg.Resources.MachineID)
if err != nil {
return nil, err
}
} else {
reg.MachineID = ownerID[0]
_, err := db.Exec(ctx, `UPDATE harmony_machines SET
cpu=$1, ram=$2, gpu=$3, gpuram=$4 WHERE id=$6`,
reg.Cpu, reg.Ram, reg.Gpu, reg.GpuRam, reg.Resources.MachineID)
if err != nil {
return nil, err
}
}
cleaned := CleanupMachines(context.Background(), db)
logger.Infow("Cleaned up machines", "count", cleaned)
}
go func() {
for {
time.Sleep(time.Minute)
if reg.shutdown.Load() {
return
}
_, err := db.Exec(ctx, `UPDATE harmony_machines SET last_contact=CURRENT_TIMESTAMP`)
if err != nil {
logger.Error("Cannot keepalive ", err)
}
}
}()
return &reg, nil
}
func CleanupMachines(ctx context.Context, db *harmonydb.DB) int {
ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`,
time.Now().Add(-1*LOOKS_DEAD_TIMEOUT))
if err != nil {
logger.Warn("unable to delete old machines: ", err)
}
return ct
}
func (res *Reg) Shutdown() {
res.shutdown.Store(true)
}
func getResources() (res Resources, err error) {
b, err := exec.Command(`ps`, `-ef`).CombinedOutput()
if err != nil {
logger.Warn("Could not safety check for 2+ processes: ", err)
} else {
found := 0
for _, b := range bytes.Split(b, []byte("\n")) {
if lotusRE.Match(b) {
found++
}
}
if found > 1 {
logger.Warn("lotus-provider's defaults are for running alone. Use task maximums or CGroups.")
}
}
res = Resources{
Cpu: runtime.NumCPU(),
Ram: memory.FreeMemory(),
GpuRam: getGpuRam(),
}
{ // GPU boolean
gpus, err := ffi.GetGPUDevices()
if err != nil {
logger.Errorf("getting gpu devices failed: %+v", err)
}
all := strings.ToLower(strings.Join(gpus, ","))
if len(gpus) > 1 || strings.Contains(all, "ati") || strings.Contains(all, "nvidia") {
res.Gpu = 1
}
}
return res, nil
}
func getGpuRam() (res []uint64) {
platforms, err := cl.GetPlatforms()
if err != nil {
logger.Error(err)
return res
}
lo.ForEach(platforms, func(p *cl.Platform, i int) {
d, err := p.GetAllDevices()
if err != nil {
logger.Error(err)
return
}
lo.ForEach(d, func(d *cl.Device, i int) {
res = append(res, uint64(d.GlobalMemSize()))
})
})
return res
}
func DiskFree(path string) (uint64, error) {
s := unix.Statfs_t{}
err := unix.Statfs(path, &s)
if err != nil {
return 0, err
}
return s.Bfree * uint64(s.Bsize), nil
}