From d17bfb14f8cc2571e20fb8c7eca4bfd2f8f0d514 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Fri, 14 Jul 2023 18:05:49 -0500 Subject: [PATCH] feat: miner deps: harmonydb --- .circleci/config.yml | 53 ++-- .circleci/gen.go | 4 + .circleci/template.yml | 45 +-- build/openrpc/full.json.gz | Bin 33969 -> 33969 bytes build/openrpc/gateway.json.gz | Bin 10425 -> 10425 bytes build/openrpc/miner.json.gz | Bin 15939 -> 15939 bytes build/openrpc/worker.json.gz | Bin 5246 -> 5246 bytes build/version.go | 2 +- documentation/en/cli-lotus-miner.md | 2 +- documentation/en/cli-lotus-worker.md | 2 +- documentation/en/cli-lotus.md | 2 +- .../en/default-lotus-miner-config.toml | 40 +++ extern/filecoin-ffi | 2 +- go.mod | 9 +- go.sum | 21 +- itests/harmonydb_test.go | 173 ++++++++++++ itests/kit/ensemble.go | 29 ++ lib/harmony/harmonydb/doc.go | 35 +++ lib/harmony/harmonydb/harmonydb.go | 266 ++++++++++++++++++ lib/harmony/harmonydb/metrics.go | 77 +++++ lib/harmony/harmonydb/sql/20230706.sql | 6 + lib/harmony/harmonydb/userfuncs.go | 149 ++++++++++ metrics/metrics.go | 79 +++--- node/builder_miner.go | 6 + node/config/def.go | 7 + node/config/doc_gen.go | 46 +++ node/config/types.go | 24 ++ node/impl/storminer.go | 3 + 28 files changed, 993 insertions(+), 89 deletions(-) create mode 100644 itests/harmonydb_test.go create mode 100644 lib/harmony/harmonydb/doc.go create mode 100644 lib/harmony/harmonydb/harmonydb.go create mode 100644 lib/harmony/harmonydb/metrics.go create mode 100644 lib/harmony/harmonydb/sql/20230706.sql create mode 100644 lib/harmony/harmonydb/userfuncs.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 5fcb83145..7abb9d5fc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,17 +6,9 @@ orbs: executors: golang: docker: - # Must match GO_VERSION_MIN in project root + # Must match GO_VERSION_MIN in project root. Change in gen.go - image: cimg/go:1.19.7 resource_class: medium+ - golang-2xl: - docker: - # Must match GO_VERSION_MIN in project root - - image: cimg/go:1.19.7 - resource_class: 2xlarge - ubuntu: - docker: - - image: ubuntu:20.04 commands: build-platform-specific: @@ -96,6 +88,7 @@ commands: git fetch --all install-ubuntu-deps: steps: + - run: sudo apt install curl ca-certificates gnupg - run: sudo apt-get update - run: sudo apt-get install ocl-icd-opencl-dev libhwloc-dev check-go-version: @@ -143,9 +136,9 @@ jobs: Run tests with gotestsum. working_directory: ~/lotus parameters: &test-params - executor: - type: executor - default: golang + resource_class: + type: string + default: medium+ go-test-flags: type: string default: "-timeout 20m" @@ -164,7 +157,14 @@ jobs: type: string default: unit description: Test suite name to report to CircleCI. - executor: << parameters.executor >> + docker: + - image: cimg/go:1.19.7 + environment: + LOTUS_HARMONYDB_HOSTS: yugabyte + - image: yugabytedb/yugabyte:latest + command: bin/yugabyted start --daemon=false + name: yugabyte + resource_class: << parameters.resource_class >> steps: - install-ubuntu-deps - attach_workspace: @@ -182,6 +182,8 @@ jobs: command: | mkdir -p /tmp/test-reports/<< parameters.suite >> mkdir -p /tmp/test-artifacts + dockerize -wait tcp://yugabyte:5433 -timeout 3m + env gotestsum \ --format standard-verbose \ --junitfile /tmp/test-reports/<< parameters.suite >>/junit.xml \ @@ -209,7 +211,9 @@ jobs: Branch on github.com/filecoin-project/test-vectors to checkout and test with. If empty (the default) the commit defined by the git submodule is used. - executor: << parameters.executor >> + docker: + - image: cimg/go:1.19.7 + resource_class: << parameters.resource_class >> steps: - install-ubuntu-deps - attach_workspace: @@ -396,15 +400,14 @@ jobs: Run golangci-lint. working_directory: ~/lotus parameters: - executor: - type: executor - default: golang args: type: string default: '' description: | Arguments to pass to golangci-lint - executor: << parameters.executor >> + docker: + - image: cimg/go:1.19.7 + resource_class: medium+ steps: - install-ubuntu-deps - attach_workspace: @@ -581,7 +584,7 @@ workflows: - build suite: itest-deals_concurrent target: "./itests/deals_concurrent_test.go" - executor: golang-2xl + resource_class: 2xlarge - test: name: test-itest-deals_invalid_utf8_label requires: @@ -774,6 +777,12 @@ workflows: - build suite: itest-get_messages_in_ts target: "./itests/get_messages_in_ts_test.go" + - test: + name: test-itest-harmonydb + requires: + - build + suite: itest-harmonydb + target: "./itests/harmonydb_test.go" - test: name: test-itest-lite_migration requires: @@ -1010,14 +1019,14 @@ workflows: - build suite: itest-wdpost_worker_config target: "./itests/wdpost_worker_config_test.go" - executor: golang-2xl + resource_class: 2xlarge - test: name: test-itest-worker requires: - build suite: itest-worker target: "./itests/worker_test.go" - executor: golang-2xl + resource_class: 2xlarge - test: name: test-itest-worker_upgrade requires: @@ -1044,7 +1053,7 @@ workflows: - build suite: utest-unit-rest target: "./api/... ./blockstore/... ./build/... ./chain/... ./cli/... ./cmd/... ./conformance/... ./extern/... ./gateway/... ./journal/... ./lib/... ./markets/... ./node/... ./paychmgr/... ./storage/... ./tools/..." - executor: golang-2xl + resource_class: 2xlarge - test: name: test-unit-storage requires: diff --git a/.circleci/gen.go b/.circleci/gen.go index 5d951027a..d85b15d90 100644 --- a/.circleci/gen.go +++ b/.circleci/gen.go @@ -10,6 +10,8 @@ import ( "text/template" ) +const GoVersion = "1.19.7" + //go:generate go run ./gen.go .. //go:embed template.yml @@ -109,6 +111,7 @@ func main() { Networks []string ItestFiles []string UnitSuites map[string]string + GoVersion string } in := data{ Networks: []string{"mainnet", "butterflynet", "calibnet", "debug"}, @@ -123,6 +126,7 @@ func main() { } return ret }(), + GoVersion: GoVersion, } out, err := os.Create("./config.yml") diff --git a/.circleci/template.yml b/.circleci/template.yml index cd8aeb663..d8eeb6048 100644 --- a/.circleci/template.yml +++ b/.circleci/template.yml @@ -6,17 +6,9 @@ orbs: executors: golang: docker: - # Must match GO_VERSION_MIN in project root - - image: cimg/go:1.19.7 + # Must match GO_VERSION_MIN in project root. Change in gen.go + - image: cimg/go:[[ .GoVersion]] resource_class: medium+ - golang-2xl: - docker: - # Must match GO_VERSION_MIN in project root - - image: cimg/go:1.19.7 - resource_class: 2xlarge - ubuntu: - docker: - - image: ubuntu:20.04 commands: build-platform-specific: @@ -96,6 +88,7 @@ commands: git fetch --all install-ubuntu-deps: steps: + - run: sudo apt install curl ca-certificates gnupg - run: sudo apt-get update - run: sudo apt-get install ocl-icd-opencl-dev libhwloc-dev check-go-version: @@ -143,9 +136,9 @@ jobs: Run tests with gotestsum. working_directory: ~/lotus parameters: &test-params - executor: - type: executor - default: golang + resource_class: + type: string + default: medium+ go-test-flags: type: string default: "-timeout 20m" @@ -164,7 +157,14 @@ jobs: type: string default: unit description: Test suite name to report to CircleCI. - executor: << parameters.executor >> + docker: + - image: cimg/go:[[ .GoVersion]] + environment: + LOTUS_HARMONYDB_HOSTS: yugabyte + - image: yugabytedb/yugabyte:latest + command: bin/yugabyted start --daemon=false + name: yugabyte + resource_class: << parameters.resource_class >> steps: - install-ubuntu-deps - attach_workspace: @@ -182,6 +182,8 @@ jobs: command: | mkdir -p /tmp/test-reports/<< parameters.suite >> mkdir -p /tmp/test-artifacts + dockerize -wait tcp://yugabyte:5433 -timeout 3m + env gotestsum \ --format standard-verbose \ --junitfile /tmp/test-reports/<< parameters.suite >>/junit.xml \ @@ -209,7 +211,9 @@ jobs: Branch on github.com/filecoin-project/test-vectors to checkout and test with. If empty (the default) the commit defined by the git submodule is used. - executor: << parameters.executor >> + docker: + - image: cimg/go:[[ .GoVersion]] + resource_class: << parameters.resource_class >> steps: - install-ubuntu-deps - attach_workspace: @@ -396,15 +400,14 @@ jobs: Run golangci-lint. working_directory: ~/lotus parameters: - executor: - type: executor - default: golang args: type: string default: '' description: | Arguments to pass to golangci-lint - executor: << parameters.executor >> + docker: + - image: cimg/go:[[ .GoVersion]] + resource_class: medium+ steps: - install-ubuntu-deps - attach_workspace: @@ -543,7 +546,7 @@ workflows: suite: itest-[[ $name ]] target: "./itests/[[ $file ]]" [[- if or (eq $name "worker") (eq $name "deals_concurrent") (eq $name "wdpost_worker_config")]] - executor: golang-2xl + resource_class: 2xlarge [[- end]] [[- if (eq $name "wdpost")]] get-params: true @@ -558,7 +561,7 @@ workflows: suite: utest-[[ $suite ]] target: "[[ $pkgs ]]" [[if eq $suite "unit-cli"]]get-params: true[[end]] - [[- if eq $suite "unit-rest"]]executor: golang-2xl[[end]] + [[- if eq $suite "unit-rest"]]resource_class: 2xlarge[[end]] [[- end]] - test: go-test-flags: "-run=TestMulticoreSDR" diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 6a25cd677273192c7b42c6211511056b637a1a70..340f840afe6d62811f6c44896e95603e0a9adbe3 100644 GIT binary patch delta 23 fcmdnk$+WSPX+j61+s00X77o_d-S$_murUAtZ+!^x delta 23 ecmdnk$+WSPX+j5M+s00X7LJxa|IjN}*cbqH8VOF){nv*?+kixt<_^>0049{33~tl diff --git a/build/version.go b/build/version.go index 2b7cbeda2..56a2cc0ab 100644 --- a/build/version.go +++ b/build/version.go @@ -37,7 +37,7 @@ func BuildTypeString() string { } // BuildVersion is the local build version -const BuildVersion = "1.23.2-dev" +const BuildVersion = "1.23.3-dev" func UserVersion() string { if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" { diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index a6ea67562..ee8f45837 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -7,7 +7,7 @@ USAGE: lotus-miner [global options] command [command options] [arguments...] VERSION: - 1.23.2-dev + 1.23.3-dev COMMANDS: init Initialize a lotus miner repo diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index 9c749f44a..bdf992e58 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -7,7 +7,7 @@ USAGE: lotus-worker [global options] command [command options] [arguments...] VERSION: - 1.23.2-dev + 1.23.3-dev COMMANDS: run Start lotus worker diff --git a/documentation/en/cli-lotus.md b/documentation/en/cli-lotus.md index 751182a43..fe63acbc6 100644 --- a/documentation/en/cli-lotus.md +++ b/documentation/en/cli-lotus.md @@ -7,7 +7,7 @@ USAGE: lotus [global options] command [command options] [arguments...] VERSION: - 1.23.2-dev + 1.23.3-dev COMMANDS: daemon Start a lotus daemon process diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index c0a204bf1..0efb22ff0 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -894,3 +894,43 @@ #GCInterval = "1m0s" +[HarmonyDB] + # HOSTS is a list of hostnames to nodes running YugabyteDB + # in a cluster. Only 1 is required + # + # type: []string + # env var: LOTUS_HARMONYDB_HOSTS + #Hosts = ["127.0.0.1"] + + # The Yugabyte server's username with full credentials to operate on Lotus' Database. Blank for default. + # + # type: string + # env var: LOTUS_HARMONYDB_USERNAME + #Username = "yugabyte" + + # The password for the related username. Blank for default. + # + # type: string + # env var: LOTUS_HARMONYDB_PASSWORD + #Password = "yugabyte" + + # The database (logical partition) within Yugabyte. Blank for default. + # + # type: string + # env var: LOTUS_HARMONYDB_DATABASE + #Database = "yugabyte" + + # The port to find Yugabyte. Blank for default. + # + # type: string + # env var: LOTUS_HARMONYDB_PORT + #Port = "5433" + + # ITest is for optimized integration testing and not + # for production. Blank for default production configuration. + # + # type: string + # env var: LOTUS_HARMONYDB_ITEST + #ITest = "" + + diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index de34caff9..a458f638e 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit de34caff946d598edb299566d951b44b9b7f7dd4 +Subproject commit a458f638e3c8603c9b5a9ed9847c3af4597e46d4 diff --git a/go.mod b/go.mod index ab1f60b6e..2da784ad6 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,7 @@ require ( github.com/filecoin-project/test-vectors/schema v0.0.5 github.com/gbrlsnchs/jwt/v3 v3.0.1 github.com/gdamore/tcell/v2 v2.2.0 + github.com/georgysavva/scany/v2 v2.0.0 github.com/go-openapi/spec v0.19.11 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 @@ -102,6 +103,7 @@ require ( github.com/ipld/go-ipld-selector-text-lite v0.0.1 github.com/ipni/go-libipni v0.0.8 github.com/ipni/index-provider v0.12.0 + github.com/jackc/pgx/v5 v5.4.1 github.com/kelseyhightower/envconfig v1.4.0 github.com/koalacxr/quantile v0.0.1 github.com/libp2p/go-buffer-pool v0.1.0 @@ -127,7 +129,7 @@ require ( github.com/multiformats/go-varint v0.0.7 github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 github.com/polydawn/refmt v0.89.0 - github.com/prometheus/client_golang v1.14.0 + github.com/prometheus/client_golang v1.16.0 github.com/puzpuzpuz/xsync/v2 v2.4.0 github.com/raulk/clock v1.1.0 github.com/raulk/go-watchdog v1.3.0 @@ -243,6 +245,9 @@ require ( github.com/ipfs/go-verifcid v0.0.2 // indirect github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect @@ -291,7 +296,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect - github.com/prometheus/procfs v0.9.0 // indirect + github.com/prometheus/procfs v0.10.1 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/qtls-go1-19 v0.3.2 // indirect diff --git a/go.sum b/go.sum index 71341203d..ebbc4dcc8 100644 --- a/go.sum +++ b/go.sum @@ -169,6 +169,7 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cockroachdb/cockroach-go/v2 v2.2.0 h1:/5znzg5n373N/3ESjHF5SMLxiW4RKB05Ql//KWfeTFs= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/codegangsta/cli v1.20.0/go.mod h1:/qJNoX69yVSKu5o4jLyXAENLRyk1uhi7zkbQ3slBdOA= @@ -391,6 +392,8 @@ github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdk github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg= github.com/gdamore/tcell/v2 v2.2.0 h1:vSyEgKwraXPSOkvCk7IwOSyX+Pv3V2cV9CikJMXg4U4= github.com/gdamore/tcell/v2 v2.2.0/go.mod h1:cTTuF84Dlj/RqmaCIV5p4w8uG1zWdk0SF6oBpwHp4fU= +github.com/georgysavva/scany/v2 v2.0.0 h1:RGXqxDv4row7/FYoK8MRXAZXqoWF/NM+NP0q50k3DKU= +github.com/georgysavva/scany/v2 v2.0.0/go.mod h1:sigOdh+0qb/+aOs3TVhehVT10p8qJL7K/Zhyz8vWo38= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= @@ -465,6 +468,7 @@ github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.4.1 h1:1Yx4Myt7BxzvUr5ldGSbwYiZG6t9wGBZ+8/fX3Wvtq0= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -867,6 +871,14 @@ github.com/ipni/index-provider v0.12.0 h1:R3F6dxxKNv4XkE4GJZNLOG0bDEbBQ/S5iztXwS github.com/ipni/index-provider v0.12.0/go.mod h1:GhyrADJp7n06fqoc1djzkvL4buZYHzV8SoWrlxEo5F4= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.4.1 h1:oKfB/FhuVtit1bBM3zNRRsZ925ZkMN3HXL+LgLUM9lE= +github.com/jackc/pgx/v5 v5.4.1/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY= +github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk= +github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -958,6 +970,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= @@ -1426,8 +1439,8 @@ github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqr github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ= -github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw= -github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -1461,8 +1474,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= -github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= -github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= +github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT1pX2CziuyQR0= github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9dFqnUakOjnEuMPJJJnI= github.com/puzpuzpuz/xsync/v2 v2.4.0 h1:5sXAMHrtx1bg9nbRZTOn8T4MkWe5V+o8yKRH02Eznag= diff --git a/itests/harmonydb_test.go b/itests/harmonydb_test.go new file mode 100644 index 000000000..9e09691da --- /dev/null +++ b/itests/harmonydb_test.go @@ -0,0 +1,173 @@ +package itests + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "testing" + + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/node/impl" +) + +func withSetup(t *testing.T, f func(*kit.TestMiner)) { + _, miner, _ := kit.EnsembleMinimal(t, + kit.LatestActorsAt(-1), + kit.MockProofs(), + ) + + f(miner) +} + +func TestCrud(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + withSetup(t, func(miner *kit.TestMiner) { + cdb := miner.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB + err := cdb.Exec(ctx, ` + INSERT INTO + itest_scratch (some_int, content) + VALUES + (11, 'cows'), + (5, 'cats') + `) + if err != nil { + t.Fatal("Could not insert: ", err) + } + var ints []struct { + Count int `db:"some_int"` + Animal string `db:"content"` + Unpopulated int + } + err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch") + if err != nil { + t.Fatal("Could not select: ", err) + } + if len(ints) != 2 { + t.Fatal("unexpected count of returns. Want 2, Got ", len(ints)) + } + if ints[0].Count != 11 || ints[1].Count != 5 { + t.Fatal("expected [11,5] got ", ints) + } + if ints[0].Animal != "cows" || ints[1].Animal != "cats" { + t.Fatal("expected, [cows, cats] ", ints) + } + fmt.Println("test completed") + }) +} + +func TestTransaction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + withSetup(t, func(miner *kit.TestMiner) { + cdb := miner.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB + if err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil { + t.Fatal("E0", err) + } + err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Transaction) (commit bool) { + if err := tx.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil { + t.Fatal("E1", err) + } + + // sum1 is read from OUTSIDE the transaction so it's the old value + var sum1 int + if err := cdb.QueryRow(ctx, "SELECT SUM(some_int) FROM itest_scratch").Scan(&sum1); err != nil { + t.Fatal("E2", err) + } + if sum1 != 4+5+6 { + t.Fatal("Expected 15, got ", sum1) + } + + // sum2 is from INSIDE the transaction, so the updated value. + var sum2 int + if err := tx.QueryRow(ctx, "SELECT SUM(some_int) FROM itest_scratch").Scan(&sum2); err != nil { + t.Fatal("E3", err) + } + if sum2 != 4+5+6+7+8+9 { + t.Fatal("Expected 39, got ", sum2) + } + return false // rollback + }) + if err != nil { + t.Fatal("ET", err) + } + + var sum2 int + // Query() example (yes, QueryRow would be preferred here) + q, err := cdb.Query(ctx, "SELECT SUM(some_int) FROM itest_scratch") + if err != nil { + t.Fatal("E4", err) + } + defer q.Close() + var rowCt int + for q.Next() { + err := q.Scan(&sum2) + if err != nil { + t.Fatal("error scanning ", err) + } + rowCt++ + } + if sum2 != 4+5+6 { + t.Fatal("Expected 15, got ", sum2) + } + if rowCt != 1 { + t.Fatal("unexpected count of rows") + } + }) +} + +func TestPartialWalk(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + withSetup(t, func(miner *kit.TestMiner) { + cdb := miner.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB + if err := cdb.Exec(ctx, ` + INSERT INTO + itest_scratch (content, some_int) + VALUES + ('andy was here', 5), + ('lotus is awesome', 6), + ('hello world', 7), + ('3rd integration test', 8), + ('fiddlesticks', 9) + `); err != nil { + t.Fatal("e1", err) + } + + // TASK: FIND THE ID of the string with a specific SHA256 + needle := "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9" + q, err := cdb.Query(ctx, `SELECT id, content FROM itest_scratch`) + if err != nil { + t.Fatal("e2", err) + } + defer q.Close() + + var tmp struct { + Src string `db:"content"` + ID int + } + + var done bool + for q.Next() { + + if err := q.StructScan(&tmp); err != nil { + t.Fatal("structscan err " + err.Error()) + } + + bSha := sha256.Sum256([]byte(tmp.Src)) + if hex.EncodeToString(bSha[:]) == needle { + done = true + break + } + } + if !done { + t.Fatal("We didn't find it.") + } + // Answer: tmp.ID + }) +} diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 19cc163af..04a02cf61 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -49,11 +49,13 @@ import ( "github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker" "github.com/filecoin-project/lotus/gateway" "github.com/filecoin-project/lotus/genesis" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/markets/idxprov" "github.com/filecoin-project/lotus/markets/idxprov/idxprov_test" lotusminer "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" testing2 "github.com/filecoin-project/lotus/node/modules/testing" @@ -357,6 +359,8 @@ func (n *Ensemble) Start() *Ensemble { n.mn = mocknet.New() } + sharedITestID := harmonydb.ITestNewID() + // --------------------- // FULL NODES // --------------------- @@ -722,6 +726,17 @@ func (n *Ensemble) Start() *Ensemble { // upgrades node.Override(new(stmgr.UpgradeSchedule), n.options.upgradeSchedule), + + node.Override(new(harmonydb.ITestID), sharedITestID), + node.Override(new(config.HarmonyDB), func() config.HarmonyDB { + return config.HarmonyDB{ + Hosts: []string{envElse("LOTUS_HARMONYDB_HOSTS", "127.0.0.1")}, + Database: "yugabyte", + Username: "yugabyte", + Password: "yugabyte", + Port: "5433", + } + }), } if m.options.subsystems.Has(SMarkets) { @@ -768,6 +783,10 @@ func (n *Ensemble) Start() *Ensemble { require.NoError(n.t, err) n.t.Cleanup(func() { _ = stop(context.Background()) }) + mCopy := m + n.t.Cleanup(func() { + mCopy.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB.ITestDeleteAll() + }) m.BaseAPI = m.StorageMiner @@ -824,6 +843,8 @@ func (n *Ensemble) Start() *Ensemble { auth := http.Header(nil) + // FUTURE: Use m.MinerNode.(BaseAPI).(impl.StorageMinerAPI).HarmonyDB to setup. + remote := paths.NewRemote(localStore, m.MinerNode, auth, 20, &paths.DefaultPartialFileHandler{}) store := m.options.workerStorageOpt(remote) @@ -853,6 +874,7 @@ func (n *Ensemble) Start() *Ensemble { require.NoError(n.t, err) n.active.workers = append(n.active.workers, m) + } // If we are here, we have processed all inactive workers and moved them @@ -1065,3 +1087,10 @@ func importPreSealMeta(ctx context.Context, meta genesis.Miner, mds dtypes.Metad size := binary.PutUvarint(buf, uint64(maxSectorID)) return mds.Put(ctx, datastore.NewKey(pipeline.StorageCounterDSPrefix), buf[:size]) } + +func envElse(env, els string) string { + if v := os.Getenv(env); v != "" { + return v + } + return els +} diff --git a/lib/harmony/harmonydb/doc.go b/lib/harmony/harmonydb/doc.go new file mode 100644 index 000000000..ac60a0260 --- /dev/null +++ b/lib/harmony/harmonydb/doc.go @@ -0,0 +1,35 @@ +/* +# HarmonyDB provides database abstractions over SP-wide Postgres-compatible instance(s). + +# Features + + Rolling to secondary database servers on connection failure + Convenience features for Go + SQL + Prevention of SQL injection vulnerabilities + Monitors behavior via Prometheus stats and logging of errors. + +# Usage + +Processes should use New() to instantiate a *DB and keep it. +Consumers can use this *DB concurrently. +Creating and changing tables & views should happen in ./sql/ folder. +Name the file "today's date" in the format: YYYYMMDD.sql (ex: 20231231.sql for the year's last day) + + a. CREATE TABLE should NOT have a schema: + GOOD: CREATE TABLE foo (); + BAD: CREATE TABLE me.foo (); + b. Schema is managed for you. It provides isolation for integraton tests & multi-use. + c. Git Merges: All run once, so old-after-new is OK when there are no deps. + d. NEVER change shipped sql files. Have later files make corrections. + e. Anything not ran will be ran, so an older date making it to master is OK. + +Write SQL with context, raw strings, and args: + + name := "Alice" + var ID int + err := QueryRow(ctx, "SELECT id from people where first_name=?", name).Scan(&ID) + fmt.Println(ID) + +Note: Scan() is column-oriented, while Select() & StructScan() is field name/tag oriented. +*/ +package harmonydb diff --git a/lib/harmony/harmonydb/harmonydb.go b/lib/harmony/harmonydb/harmonydb.go new file mode 100644 index 000000000..702a97681 --- /dev/null +++ b/lib/harmony/harmonydb/harmonydb.go @@ -0,0 +1,266 @@ +package harmonydb + +import ( + "context" + "embed" + "errors" + "fmt" + "math/rand" + "regexp" + "sort" + "strconv" + "strings" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/filecoin-project/lotus/node/config" +) + +type ITestID string + +// ItestNewID see ITestWithID doc +func ITestNewID() ITestID { + return ITestID(strconv.Itoa(rand.Intn(99999))) +} + +type DB struct { + pgx *pgxpool.Pool + cfg *pgxpool.Config + schema string + hostnames []string + log func(string) +} + +var logger = logging.Logger("harmonydb") + +// NewFromConfig is a convenience function. +// In usage: +// +// db, err := NewFromConfig(config.HarmonyDB) // in binary init +func NewFromConfig(cfg config.HarmonyDB) (*DB, error) { + return New( + cfg.Hosts, + cfg.Username, + cfg.Password, + cfg.Database, + cfg.Port, + "", + func(s string) { logger.Error(s) }, + ) +} + +func NewFromConfigWithITestID(cfg config.HarmonyDB) func(id ITestID) (*DB, error) { + return func(id ITestID) (*DB, error) { + return New( + cfg.Hosts, + cfg.Username, + cfg.Password, + cfg.Database, + cfg.Port, + id, + func(s string) { logger.Error(s) }, + ) + } +} + +// New is to be called once per binary to establish the pool. +// log() is for errors. It returns an upgraded database's connection. +// This entry point serves both production and integration tests, so it's more DI. +func New(hosts []string, username, password, database, port string, itestID ITestID, log func(string)) (*DB, error) { + itest := string(itestID) + connString := "" + if len(hosts) > 0 { + connString = "host=" + hosts[0] + " " + } + for k, v := range map[string]string{"user": username, "password": password, "dbname": database, "port": port} { + if strings.TrimSpace(v) != "" { + connString += k + "=" + v + " " + } + } + + schema := "lotus" + if itest != "" { + schema = "itest_" + itest + } + + if err := ensureSchemaExists(connString, schema); err != nil { + return nil, err + } + cfg, err := pgxpool.ParseConfig(connString + "search_path=" + schema) + if err != nil { + return nil, err + } + + // enable multiple fallback hosts. + for _, h := range hosts[1:] { + cfg.ConnConfig.Fallbacks = append(cfg.ConnConfig.Fallbacks, &pgconn.FallbackConfig{Host: h}) + } + + cfg.ConnConfig.OnNotice = func(conn *pgconn.PgConn, n *pgconn.Notice) { + log("database notice: " + n.Message + ": " + n.Detail) + DBMeasures.Errors.M(1) + } + + db := DB{cfg: cfg, schema: schema, hostnames: hosts, log: log} // pgx populated in AddStatsAndConnect + if err := db.addStatsAndConnect(); err != nil { + return nil, err + } + + return &db, db.upgrade() +} + +type tracer struct { +} + +type ctxkey string + +var sqlStart = ctxkey("sqlStart") + +func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context { + return context.WithValue(ctx, sqlStart, time.Now()) +} +func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) { + DBMeasures.Hits.M(1) + ms := time.Since(ctx.Value(sqlStart).(time.Time)).Milliseconds() + DBMeasures.TotalWait.M(ms) + DBMeasures.Waits.Observe(float64(ms)) + if data.Err != nil { + DBMeasures.Errors.M(1) + } + // Can log what type of query it is, but not what tables + // Can log rows affected. +} + +// addStatsAndConnect connects a prometheus logger. Be sure to run this before using the DB. +func (db *DB) addStatsAndConnect() error { + + db.cfg.ConnConfig.Tracer = tracer{} + + hostnameToIndex := map[string]float64{} + for i, h := range db.hostnames { + hostnameToIndex[h] = float64(i) + } + db.cfg.AfterConnect = func(ctx context.Context, c *pgx.Conn) error { + s := db.pgx.Stat() + DBMeasures.OpenConnections.M(int64(s.TotalConns())) + DBMeasures.WhichHost.Observe(hostnameToIndex[c.Config().Host]) + + //FUTURE place for any connection seasoning + return nil + } + + var err error + db.pgx, err = pgxpool.NewWithConfig(context.Background(), db.cfg) + if err != nil { + db.log(fmt.Sprintf("Unable to connect to database: %v\n", err)) + return err + } + return nil +} + +// ITestDeleteAll will delete everything created for "this" integration test. +// This must be called at the end of each integration test. +func (db *DB) ITestDeleteAll() { + if !strings.HasPrefix(db.schema, "itest_") { + fmt.Println("Warning: this should never be called on anything but an itest schema.") + return + } + defer db.pgx.Close() + _, err := db.pgx.Exec(context.Background(), "DROP SCHEMA "+db.schema+" CASCADE") + if err != nil { + fmt.Println("warning: unclean itest shutdown: cannot delete schema: " + err.Error()) + return + } +} + +var schemaREString = "^[A-Za-z0-9_]+$" +var schemaRE = regexp.MustCompile(schemaREString) + +func ensureSchemaExists(connString, schema string) error { + // FUTURE allow using fallback DBs for start-up. + p, err := pgx.Connect(context.Background(), connString) + if err != nil { + return err + } + defer func() { _ = p.Close(context.Background()) }() + + if len(schema) < 5 || !schemaRE.MatchString(schema) { + return errors.New("schema must be of the form " + schemaREString + "\n Got: " + schema) + } + _, err = p.Exec(context.Background(), "CREATE SCHEMA IF NOT EXISTS "+schema) + if err != nil { + return fmt.Errorf("cannot create schema: %w", err) + } + return nil +} + +//go:embed sql +var fs embed.FS + +func (db *DB) upgrade() error { + // Does the version table exist? if not, make it. + // NOTE: This cannot change except via the next sql file. + err := db.Exec(context.Background(), `CREATE TABLE IF NOT EXISTS base ( + id SERIAL PRIMARY KEY, + entry CHAR(12), + applied TIMESTAMP DEFAULT current_timestamp + )`) + if err != nil { + db.log("Upgrade failed.") + return err + } + + // __Run scripts in order.__ + + landed := map[string]bool{} + { + var landedEntries []struct{ Entry string } + err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base") + if err != nil { + db.log("Cannot read entries: " + err.Error()) + return err + } + for _, l := range landedEntries { + landed[l.Entry] = true + } + } + dir, err := fs.ReadDir("sql") + if err != nil { + db.log("Cannot read fs entries: " + err.Error()) + return err + } + sort.Slice(dir, func(i, j int) bool { return dir[i].Name() < dir[j].Name() }) + for _, e := range dir { + name := e.Name() + if landed[name] || !strings.HasSuffix(name, ".sql") { + continue + } + file, err := fs.ReadFile("sql/" + name) + if err != nil { + db.log("weird embed file read err") + return err + } + for _, s := range strings.Split(string(file), ";") { // Implement the changes. + if len(strings.TrimSpace(s)) == 0 { + continue + } + _, err = db.pgx.Exec(context.Background(), s) + if err != nil { + db.log(fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error())) + return err + } + } + + // Mark Completed. + err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name) + if err != nil { + db.log("Cannot update base: " + err.Error()) + return fmt.Errorf("cannot insert into base: %w", err) + } + } + return nil +} diff --git a/lib/harmony/harmonydb/metrics.go b/lib/harmony/harmonydb/metrics.go new file mode 100644 index 000000000..b29a76ad5 --- /dev/null +++ b/lib/harmony/harmonydb/metrics.go @@ -0,0 +1,77 @@ +package harmonydb + +import ( + "github.com/prometheus/client_golang/prometheus" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + "github.com/filecoin-project/lotus/metrics" +) + +var ( + dbTag, _ = tag.NewKey("db_name") + pre = "harmonydb_base_" + waitsBuckets = []float64{0, 10, 20, 30, 50, 80, 130, 210, 340, 550, 890} + whichHostBuckets = []float64{0, 1, 2, 3, 4, 5} +) + +// DBMeasures groups all db metrics. +var DBMeasures = struct { + Hits *stats.Int64Measure + TotalWait *stats.Int64Measure + Waits prometheus.Histogram + OpenConnections *stats.Int64Measure + Errors *stats.Int64Measure + WhichHost prometheus.Histogram +}{ + Hits: stats.Int64(pre+"hits", "Total number of uses.", stats.UnitDimensionless), + TotalWait: stats.Int64(pre+"total_wait", "Total delay. A numerator over hits to get average wait.", stats.UnitMilliseconds), + Waits: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: pre + "waits", + Buckets: waitsBuckets, + Help: "The histogram of waits for query completions.", + }), + OpenConnections: stats.Int64(pre+"open_connections", "Total connection count.", stats.UnitDimensionless), + Errors: stats.Int64(pre+"errors", "Total error count.", stats.UnitDimensionless), + WhichHost: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: pre + "which_host", + Buckets: whichHostBuckets, + Help: "The index of the hostname being used", + }), +} + +// CacheViews groups all cache-related default views. +func init() { + metrics.RegisterViews( + &view.View{ + Measure: DBMeasures.Hits, + Aggregation: view.Sum(), + TagKeys: []tag.Key{dbTag}, + }, + &view.View{ + Measure: DBMeasures.TotalWait, + Aggregation: view.Sum(), + TagKeys: []tag.Key{dbTag}, + }, + &view.View{ + Measure: DBMeasures.OpenConnections, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{dbTag}, + }, + &view.View{ + Measure: DBMeasures.Errors, + Aggregation: view.Sum(), + TagKeys: []tag.Key{dbTag}, + }, + ) + err := prometheus.Register(DBMeasures.Waits) + if err != nil { + panic(err) + } + + err = prometheus.Register(DBMeasures.WhichHost) + if err != nil { + panic(err) + } +} diff --git a/lib/harmony/harmonydb/sql/20230706.sql b/lib/harmony/harmonydb/sql/20230706.sql new file mode 100644 index 000000000..b45aca7fa --- /dev/null +++ b/lib/harmony/harmonydb/sql/20230706.sql @@ -0,0 +1,6 @@ +CREATE TABLE itest_scratch ( + id SERIAL PRIMARY KEY, + content TEXT, + some_int INTEGER, + update_time TIMESTAMP DEFAULT current_timestamp +) \ No newline at end of file diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go new file mode 100644 index 000000000..8eebdd607 --- /dev/null +++ b/lib/harmony/harmonydb/userfuncs.go @@ -0,0 +1,149 @@ +package harmonydb + +import ( + "context" + + "github.com/georgysavva/scany/v2/pgxscan" + "github.com/jackc/pgx/v5" +) + +type Intf interface { + ITestDeleteAll() + Exec(ctx context.Context, sql rawStringOnly, arguments ...any) error + Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) + QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row + Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error + BeginTransaction(ctx context.Context, f func(t *Transaction) (commit bool)) (retErr error) +} + +// rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries. +// In any package, raw strings will satisfy compilation. Ex: +// +// harmonydb.Exec("INSERT INTO version (number) VALUES (1)") +// +// This prevents SQL injection attacks where the input contains query fragments. +type rawStringOnly string + +// Exec executes changes (INSERT, DELETE, or UPDATE). +// Note, for CREATE & DROP please keep these permanent and express +// them in the ./sql/ files (next number). +func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) error { + _, err := db.pgx.Exec(ctx, string(sql), arguments...) + return err +} + +type Qry interface { + Next() bool + Err() error + Close() + Scan(...any) error + Values() ([]any, error) +} + +// Query offers Next/Err/Close/Scan/Values/StructScan +type Query struct { + Qry +} + +// Query allows iterating returned values to save memory consumption +// with the downside of needing to `defer q.Close()`. For a simpler interface, +// try Select() +// Next() must be called to advance the row cursor, including the first time: +// Ex: +// q, err := db.Query(ctx, "SELECT id, name FROM users") +// handleError(err) +// defer q.Close() +// +// for q.Next() { +// var id int +// var name string +// handleError(q.Scan(&id, &name)) +// fmt.Println(id, name) +// } +func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) { + q, err := db.pgx.Query(ctx, string(sql), arguments...) + return &Query{q}, err +} +func (q *Query) StructScan(s any) error { + return pgxscan.ScanRow(s, q.Qry.(pgx.Rows)) +} + +type Row interface { + Scan(...any) error +} + +// QueryRow gets 1 row using column order matching. +// This is a timesaver for the special case of wanting the first row returned only. +// EX: +// +// var name, pet string +// var ID = 123 +// err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet) +func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row { + return db.pgx.QueryRow(ctx, string(sql), arguments...) +} + +/* +Select multiple rows into a slice using name matching +Ex: + + type user struct { + Name string + ID int + Number string `db:"tel_no"` + } + + var users []user + pet := "cat" + err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet) +*/ +func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { + return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...) +} + +type Transaction struct { + pgx.Tx +} + +// BeginTransaction is how you can access transactions using this library. +// The entire transaction happens in the function passed in. +// The return must be true or a rollback will occur. +func (db *DB) BeginTransaction(ctx context.Context, f func(t *Transaction) (commit bool)) (retErr error) { + tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return err + } + var commit bool + defer func() { // Panic clean-up. + if !commit { + retErr = tx.Rollback(ctx) + } + }() + commit = f(&Transaction{tx}) + if commit { + return tx.Commit(ctx) + } + return nil +} + +// Exec in a transaction. +func (t *Transaction) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) error { + _, err := t.Tx.Exec(ctx, string(sql), arguments...) + return err +} + +// Query in a transaction. +func (t *Transaction) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) { + q, err := t.Tx.Query(ctx, string(sql), arguments...) + return &Query{q}, err +} + +// QueryRow in a transaction. +func (t *Transaction) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row { + return t.Tx.QueryRow(ctx, string(sql), arguments...) +} + +// Select in a transaction. +func (t *Transaction) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { + return pgxscan.Select(ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...) +} diff --git a/metrics/metrics.go b/metrics/metrics.go index ee7bd8695..b1c241b21 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -695,46 +695,55 @@ var ( } ) +var views = []*view.View{ + InfoView, + PeerCountView, + APIRequestDurationView, + + GraphsyncReceivingPeersCountView, + GraphsyncReceivingActiveCountView, + GraphsyncReceivingCountCountView, + GraphsyncReceivingTotalMemoryAllocatedView, + GraphsyncReceivingTotalPendingAllocationsView, + GraphsyncReceivingPeersPendingView, + GraphsyncSendingPeersCountView, + GraphsyncSendingActiveCountView, + GraphsyncSendingCountCountView, + GraphsyncSendingTotalMemoryAllocatedView, + GraphsyncSendingTotalPendingAllocationsView, + GraphsyncSendingPeersPendingView, + + RcmgrAllowConnView, + RcmgrBlockConnView, + RcmgrAllowStreamView, + RcmgrBlockStreamView, + RcmgrAllowPeerView, + RcmgrBlockPeerView, + RcmgrAllowProtoView, + RcmgrBlockProtoView, + RcmgrBlockProtoPeerView, + RcmgrAllowSvcView, + RcmgrBlockSvcView, + RcmgrBlockSvcPeerView, + RcmgrAllowMemView, + RcmgrBlockMemView, +} + // DefaultViews is an array of OpenCensus views for metric gathering purposes var DefaultViews = func() []*view.View { - views := []*view.View{ - InfoView, - PeerCountView, - APIRequestDurationView, - - GraphsyncReceivingPeersCountView, - GraphsyncReceivingActiveCountView, - GraphsyncReceivingCountCountView, - GraphsyncReceivingTotalMemoryAllocatedView, - GraphsyncReceivingTotalPendingAllocationsView, - GraphsyncReceivingPeersPendingView, - GraphsyncSendingPeersCountView, - GraphsyncSendingActiveCountView, - GraphsyncSendingCountCountView, - GraphsyncSendingTotalMemoryAllocatedView, - GraphsyncSendingTotalPendingAllocationsView, - GraphsyncSendingPeersPendingView, - - RcmgrAllowConnView, - RcmgrBlockConnView, - RcmgrAllowStreamView, - RcmgrBlockStreamView, - RcmgrAllowPeerView, - RcmgrBlockPeerView, - RcmgrAllowProtoView, - RcmgrBlockProtoView, - RcmgrBlockProtoPeerView, - RcmgrAllowSvcView, - RcmgrBlockSvcView, - RcmgrBlockSvcPeerView, - RcmgrAllowMemView, - RcmgrBlockMemView, - } - views = append(views, blockstore.DefaultViews...) - views = append(views, rpcmetrics.DefaultViews...) return views }() +// RegisterViews adds views to the default list without modifying this file. +func RegisterViews(v ...*view.View) { + views = append(views, v...) +} + +func init() { + RegisterViews(blockstore.DefaultViews...) + RegisterViews(rpcmetrics.DefaultViews...) +} + var ChainNodeViews = append([]*view.View{ ChainNodeHeightView, ChainNodeHeightExpectedView, diff --git a/node/builder_miner.go b/node/builder_miner.go index d9c92422d..bd81f4265 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen/slashfilter" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/markets/dagstore" "github.com/filecoin-project/lotus/markets/dealfilter" "github.com/filecoin-project/lotus/markets/idxprov" @@ -230,7 +231,12 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(config.SealerConfig), cfg.Storage), Override(new(config.ProvingConfig), cfg.Proving), + Override(new(config.HarmonyDB), cfg.HarmonyDB), + Override(new(harmonydb.ITestID), harmonydb.ITestID("")), Override(new(*ctladdr.AddressSelector), modules.AddressSelector(&cfg.Addresses)), + Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) { + return harmonydb.NewFromConfigWithITestID(cfg)(id) + }), ) } diff --git a/node/config/def.go b/node/config/def.go index 42b035c66..47d314fc0 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -269,6 +269,13 @@ func DefaultStorageMiner() *StorageMiner { MaxConcurrentUnseals: 5, GCInterval: Duration(1 * time.Minute), }, + HarmonyDB: HarmonyDB{ + Hosts: []string{"127.0.0.1"}, + Username: "yugabyte", + Password: "yugabyte", + Database: "yugabyte", + Port: "5433", + }, } cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 5361b2d6c..c513eecbc 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -468,6 +468,46 @@ Set to 0 to keep all mappings`, Comment: ``, }, }, + "HarmonyDB": []DocField{ + { + Name: "Hosts", + Type: "[]string", + + Comment: `HOSTS is a list of hostnames to nodes running YugabyteDB +in a cluster. Only 1 is required`, + }, + { + Name: "Username", + Type: "string", + + Comment: `The Yugabyte server's username with full credentials to operate on Lotus' Database. Blank for default.`, + }, + { + Name: "Password", + Type: "string", + + Comment: `The password for the related username. Blank for default.`, + }, + { + Name: "Database", + Type: "string", + + Comment: `The database (logical partition) within Yugabyte. Blank for default.`, + }, + { + Name: "Port", + Type: "string", + + Comment: `The port to find Yugabyte. Blank for default.`, + }, + { + Name: "ITest", + Type: "string", + + Comment: `ITest is for optimized integration testing and not +for production. Blank for default production configuration.`, + }, + }, "IndexConfig": []DocField{ { Name: "EnableMsgIndex", @@ -1386,6 +1426,12 @@ HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer`, Name: "DAGStore", Type: "DAGStoreConfig", + Comment: ``, + }, + { + Name: "HarmonyDB", + Type: "HarmonyDB", + Comment: ``, }, }, diff --git a/node/config/types.go b/node/config/types.go index c89e8f70b..e2d1e655d 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -61,6 +61,8 @@ type StorageMiner struct { Fees MinerFeeConfig Addresses MinerAddressConfig DAGStore DAGStoreConfig + + HarmonyDB HarmonyDB } type DAGStoreConfig struct { @@ -732,3 +734,25 @@ type IndexConfig struct { // EnableMsgIndex enables indexing of messages on chain. EnableMsgIndex bool } + +type HarmonyDB struct { + // HOSTS is a list of hostnames to nodes running YugabyteDB + // in a cluster. Only 1 is required + Hosts []string + + // The Yugabyte server's username with full credentials to operate on Lotus' Database. Blank for default. + Username string + + // The password for the related username. Blank for default. + Password string + + // The database (logical partition) within Yugabyte. Blank for default. + Database string + + // The port to find Yugabyte. Blank for default. + Port string + + // ITest is for optimized integration testing and not + // for production. Blank for default production configuration. + ITest string +} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index e4fa41c78..4932e0504 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -45,6 +45,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" mktsdagstore "github.com/filecoin-project/lotus/markets/dagstore" "github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/miner" @@ -122,6 +123,8 @@ type StorageMinerAPI struct { GetSealingConfigFunc dtypes.GetSealingConfigFunc `optional:"true"` GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc `optional:"true"` SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc `optional:"true"` + + HarmonyDB *harmonydb.DB } var _ api.StorageMiner = &StorageMinerAPI{}