From fb6fdfcc135d64da92ecd00f63a85f278dfb367f Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 6 May 2020 16:16:56 -0500 Subject: [PATCH 1/3] separate db conneciton pools for each process --- .../00020_create_btc_header_cids_table.sql | 2 +- pkg/super_node/backfiller.go | 6 ++--- pkg/super_node/config.go | 24 ++++++++++++------- pkg/super_node/service.go | 10 ++++---- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/db/migrations/00020_create_btc_header_cids_table.sql b/db/migrations/00020_create_btc_header_cids_table.sql index 9d2c1553..04a6f65e 100644 --- a/db/migrations/00020_create_btc_header_cids_table.sql +++ b/db/migrations/00020_create_btc_header_cids_table.sql @@ -7,7 +7,7 @@ CREATE TABLE btc.header_cids ( cid TEXT NOT NULL, timestamp NUMERIC NOT NULL, bits BIGINT NOT NULL, - node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE, + node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE, UNIQUE (block_number, block_hash) ); diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 48a89110..712a0425 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -69,11 +69,11 @@ type BackFillService struct { // NewBackFillService returns a new BackFillInterface func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) { - publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) + publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.BackFillDBConn, settings.IPFSMode) if err != nil { return nil, err } - indexer, err := NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) + indexer, err := NewCIDIndexer(settings.Chain, settings.BackFillDBConn, settings.IPFSMode) if err != nil { return nil, err } @@ -81,7 +81,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert if err != nil { return nil, err } - retriever, err := NewCIDRetriever(settings.Chain, settings.DB) + retriever, err := NewCIDRetriever(settings.Chain, settings.BackFillDBConn) if err != nil { return nil, err } diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index 2bb83cc7..84582a80 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -53,21 +53,23 @@ type Config struct { Chain shared.ChainType IPFSPath string IPFSMode shared.IPFSMode - DB *postgres.DB DBConfig config.Database Quit chan bool // Server fields Serve bool + ServeDBConn *postgres.DB WSEndpoint string HTTPEndpoint string IPCEndpoint string // Sync params - Sync bool - Workers int - WSClient interface{} - NodeInfo core.Node + Sync bool + SyncDBConn *postgres.DB + Workers int + WSClient interface{} + NodeInfo core.Node // Backfiller params BackFill bool + BackFillDBConn *postgres.DB HTTPClient interface{} Frequency time.Duration BatchSize uint64 @@ -110,6 +112,8 @@ func NewSuperNodeConfig() (*Config, error) { } } + c.DBConfig.Init() + c.Sync = viper.GetBool("superNode.sync") if c.Sync { workers := viper.GetInt("superNode.workers") @@ -128,6 +132,8 @@ func NewSuperNodeConfig() (*Config, error) { btcWS := viper.GetString("bitcoin.wsPath") c.NodeInfo, c.WSClient = shared.GetBtcNodeAndClient(btcWS) } + syncDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.SyncDBConn = &syncDB } c.Serve = viper.GetBool("superNode.server") @@ -151,6 +157,8 @@ func NewSuperNodeConfig() (*Config, error) { httpPath = "127.0.0.1:8081" } c.HTTPEndpoint = httpPath + serveDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.ServeDBConn = &serveDB } c.BackFill = viper.GetBool("superNode.backFill") @@ -160,9 +168,6 @@ func NewSuperNodeConfig() (*Config, error) { } } - c.DBConfig.Init() - db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) - c.DB = &db c.Quit = make(chan bool) return c, nil @@ -209,5 +214,8 @@ func (c *Config) BackFillFields() error { c.BatchSize = uint64(viper.GetInt64("superNode.batchSize")) c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber")) c.ValidationLevel = viper.GetInt("superNode.validationLevel") + + backFillDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.BackFillDBConn = &backFillDB return nil } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index c952a2db..1f09ea50 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -109,11 +109,11 @@ func NewSuperNode(settings *Config) (SuperNode, error) { if err != nil { return nil, err } - sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) + sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.SyncDBConn, settings.IPFSMode) if err != nil { return nil, err } - sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) + sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.SyncDBConn, settings.IPFSMode) if err != nil { return nil, err } @@ -124,14 +124,15 @@ func NewSuperNode(settings *Config) (SuperNode, error) { } // If we are serving, initialize the needed interfaces if settings.Serve { - sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.DB) + sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.ServeDBConn) if err != nil { return nil, err } - sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) + sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.ServeDBConn, settings.IPFSMode) if err != nil { return nil, err } + sn.db = settings.ServeDBConn } sn.QuitChan = settings.Quit sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) @@ -140,7 +141,6 @@ func NewSuperNode(settings *Config) (SuperNode, error) { sn.NodeInfo = &settings.NodeInfo sn.ipfsPath = settings.IPFSPath sn.chain = settings.Chain - sn.db = settings.DB return sn, nil } From 8dc31e4ca5fb3e3d5435827d172cdbd0dca8ac75 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 11 May 2020 10:50:05 -0500 Subject: [PATCH 2/3] make connection pool params configurable --- pkg/config/database.go | 33 +++++++++++++++++++++++---------- pkg/postgres/postgres.go | 12 ++++++++++++ 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/pkg/config/database.go b/pkg/config/database.go index 57b16faa..5b868188 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -24,19 +24,25 @@ import ( // Env variables const ( - DATABASE_NAME = "DATABASE_NAME" - DATABASE_HOSTNAME = "DATABASE_HOSTNAME" - DATABASE_PORT = "DATABASE_PORT" - DATABASE_USER = "DATABASE_USER" - DATABASE_PASSWORD = "DATABASE_PASSWORD" + DATABASE_NAME = "DATABASE_NAME" + DATABASE_HOSTNAME = "DATABASE_HOSTNAME" + DATABASE_PORT = "DATABASE_PORT" + DATABASE_USER = "DATABASE_USER" + DATABASE_PASSWORD = "DATABASE_PASSWORD" + DATABASE_MAX_IDLE_CONNECTIONS = "DATABASE_MAX_IDLE_CONNECTIONS" + DATABASE_MAX_OPEN_CONNECTIONS = "DATABASE_MAX_OPEN_CONNECTIONS" + DATABASE_MAX_CONN_LIFETIME = "DATABASE_MAX_CONN_LIFETIME" ) type Database struct { - Hostname string - Name string - User string - Password string - Port int + Hostname string + Name string + User string + Password string + Port int + MaxIdle int + MaxOpen int + MaxLifetime int } func DbConnectionString(dbConfig Database) string { @@ -57,9 +63,16 @@ func (d *Database) Init() { viper.BindEnv("database.port", DATABASE_PORT) viper.BindEnv("database.user", DATABASE_USER) viper.BindEnv("database.password", DATABASE_PASSWORD) + viper.BindEnv("database.maxIdle", DATABASE_MAX_IDLE_CONNECTIONS) + viper.BindEnv("database.maxOpen", DATABASE_MAX_OPEN_CONNECTIONS) + viper.BindEnv("database.maxLifetime", DATABASE_MAX_CONN_LIFETIME) + d.Name = viper.GetString("database.name") d.Hostname = viper.GetString("database.hostname") d.Port = viper.GetInt("database.port") d.User = viper.GetString("database.user") d.Password = viper.GetString("database.password") + d.MaxIdle = viper.GetInt("database.maxIdle") + d.MaxOpen = viper.GetInt("database.maxOpen") + d.MaxLifetime = viper.GetInt("database.maxLifetime") } diff --git a/pkg/postgres/postgres.go b/pkg/postgres/postgres.go index 8062decf..812cbfd5 100644 --- a/pkg/postgres/postgres.go +++ b/pkg/postgres/postgres.go @@ -17,6 +17,8 @@ package postgres import ( + "time" + "github.com/jmoiron/sqlx" _ "github.com/lib/pq" //postgres driver "github.com/vulcanize/vulcanizedb/pkg/config" @@ -35,6 +37,16 @@ func NewDB(databaseConfig config.Database, node core.Node) (*DB, error) { if connectErr != nil { return &DB{}, ErrDBConnectionFailed(connectErr) } + if databaseConfig.MaxOpen > 0 { + db.SetMaxOpenConns(databaseConfig.MaxOpen) + } + if databaseConfig.MaxIdle > 0 { + db.SetMaxIdleConns(databaseConfig.MaxIdle) + } + if databaseConfig.MaxLifetime > 0 { + lifetime := time.Duration(databaseConfig.MaxLifetime) * time.Second + db.SetConnMaxLifetime(lifetime) + } pg := DB{DB: db, Node: node} nodeErr := pg.CreateNode(&node) if nodeErr != nil { From e2ccd3ffdb1771162baed1dd18d4bb2dd03cf930 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 11 May 2020 18:00:30 -0500 Subject: [PATCH 3/3] update configs --- environments/superNodeBTC.toml | 14 +++++---- environments/superNodeETH.toml | 12 ++++--- pkg/config/config.go | 22 ------------- pkg/super_node/config.go | 57 ++++++++++++++++++++++++++++++++-- 4 files changed, 69 insertions(+), 36 deletions(-) delete mode 100644 pkg/config/config.go diff --git a/environments/superNodeBTC.toml b/environments/superNodeBTC.toml index f84ff97c..73178b98 100644 --- a/environments/superNodeBTC.toml +++ b/environments/superNodeBTC.toml @@ -5,16 +5,18 @@ user = "vdbm" # $DATABASE_USER password = "" # $DATABASE_PASSWORD -[ipfs] - path = "~/.ipfs" # $IPFS_PATH + [database.sync] + maxIdle = 1 + [database.backFill] + maxIdle = 5 [resync] chain = "bitcoin" # $RESYNC_CHAIN type = "full" # $RESYNC_TYPE start = 0 # $RESYNC_START stop = 0 # $RESYNC_STOP - batchSize = 1 # $RESYNC_BATCH_SIZE - batchNumber = 50 # $RESYNC_BATCH_NUMBER + batchSize = 5 # $RESYNC_BATCH_SIZE + batchNumber = 5 # $RESYNC_BATCH_NUMBER clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE resetValidation = true # $RESYNC_RESET_VALIDATION @@ -28,8 +30,8 @@ workers = 1 # $SUPERNODE_WORKERS backFill = true # $SUPERNODE_BACKFILL frequency = 45 # $SUPERNODE_FREQUENCY - batchSize = 1 # $SUPERNODE_BATCH_SIZE - batchNumber = 50 # $SUPERNODE_BATCH_NUMBER + batchSize = 5 # $SUPERNODE_BATCH_SIZE + batchNumber = 5 # $SUPERNODE_BATCH_NUMBER validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL [bitcoin] diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml index 837c6afa..997ba92e 100644 --- a/environments/superNodeETH.toml +++ b/environments/superNodeETH.toml @@ -5,8 +5,10 @@ user = "vdbm" # $DATABASE_USER password = "" # $DATABASE_PASSWORD -[ipfs] - path = "~/.ipfs" # $IPFS_PATH + [database.sync] + maxIdle = 1 + [database.backFill] + maxIdle = 5 [resync] chain = "ethereum" # $RESYNC_CHAIN @@ -14,9 +16,9 @@ start = 0 # $RESYNC_START stop = 0 # $RESYNC_STOP batchSize = 5 # $RESYNC_BATCH_SIZE - batchNumber = 50 # $RESYNC_BATCH_NUMBER + batchNumber = 5 # $RESYNC_BATCH_NUMBER timeout = 300 # $HTTP_TIMEOUT - clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE + clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE resetValidation = true # $RESYNC_RESET_VALIDATION [superNode] @@ -30,7 +32,7 @@ backFill = true # $SUPERNODE_BACKFILL frequency = 15 # $SUPERNODE_FREQUENCY batchSize = 5 # $SUPERNODE_BATCH_SIZE - batchNumber = 50 # $SUPERNODE_BATCH_NUMBER + batchNumber = 5 # $SUPERNODE_BATCH_NUMBER timeout = 300 # $HTTP_TIMEOUT validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL diff --git a/pkg/config/config.go b/pkg/config/config.go deleted file mode 100644 index 87588f9d..00000000 --- a/pkg/config/config.go +++ /dev/null @@ -1,22 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package config - -type Config struct { - Database Database - Client Client -} diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index 84582a80..ba1547b7 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -45,6 +45,18 @@ const ( SUPERNODE_BATCH_SIZE = "SUPERNODE_BATCH_SIZE" SUPERNODE_BATCH_NUMBER = "SUPERNODE_BATCH_NUMBER" SUPERNODE_VALIDATION_LEVEL = "SUPERNODE_VALIDATION_LEVEL" + + SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS" + SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS" + SYNC_MAX_CONN_LIFETIME = "SYNC_MAX_CONN_LIFETIME" + + BACKFILL_MAX_IDLE_CONNECTIONS = "BACKFILL_MAX_IDLE_CONNECTIONS" + BACKFILL_MAX_OPEN_CONNECTIONS = "BACKFILL_MAX_OPEN_CONNECTIONS" + BACKFILL_MAX_CONN_LIFETIME = "BACKFILL_MAX_CONN_LIFETIME" + + SERVER_MAX_IDLE_CONNECTIONS = "SERVER_MAX_IDLE_CONNECTIONS" + SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS" + SERVER_MAX_CONN_LIFETIME = "SERVER_MAX_CONN_LIFETIME" ) // Config struct @@ -132,7 +144,8 @@ func NewSuperNodeConfig() (*Config, error) { btcWS := viper.GetString("bitcoin.wsPath") c.NodeInfo, c.WSClient = shared.GetBtcNodeAndClient(btcWS) } - syncDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + syncDBConn := overrideDBConnConfig(c.DBConfig, Sync) + syncDB := utils.LoadPostgres(syncDBConn, c.NodeInfo) c.SyncDBConn = &syncDB } @@ -157,7 +170,8 @@ func NewSuperNodeConfig() (*Config, error) { httpPath = "127.0.0.1:8081" } c.HTTPEndpoint = httpPath - serveDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + serveDBConn := overrideDBConnConfig(c.DBConfig, Serve) + serveDB := utils.LoadPostgres(serveDBConn, c.NodeInfo) c.ServeDBConn = &serveDB } @@ -215,7 +229,44 @@ func (c *Config) BackFillFields() error { c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber")) c.ValidationLevel = viper.GetInt("superNode.validationLevel") - backFillDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + backFillDBConn := overrideDBConnConfig(c.DBConfig, BackFill) + backFillDB := utils.LoadPostgres(backFillDBConn, c.NodeInfo) c.BackFillDBConn = &backFillDB return nil } + +type mode string + +var ( + Sync mode = "sync" + BackFill mode = "backFill" + Serve mode = "serve" +) + +func overrideDBConnConfig(con config.Database, m mode) config.Database { + switch m { + case Sync: + viper.BindEnv("database.sync.maxIdle", SYNC_MAX_IDLE_CONNECTIONS) + viper.BindEnv("database.sync.maxOpen", SYNC_MAX_OPEN_CONNECTIONS) + viper.BindEnv("database.sync.maxLifetime", SYNC_MAX_CONN_LIFETIME) + con.MaxIdle = viper.GetInt("database.sync.maxIdle") + con.MaxOpen = viper.GetInt("database.sync.maxOpen") + con.MaxLifetime = viper.GetInt("database.sync.maxLifetime") + case BackFill: + viper.BindEnv("database.backFill.maxIdle", BACKFILL_MAX_IDLE_CONNECTIONS) + viper.BindEnv("database.backFill.maxOpen", BACKFILL_MAX_OPEN_CONNECTIONS) + viper.BindEnv("database.backFill.maxLifetime", BACKFILL_MAX_CONN_LIFETIME) + con.MaxIdle = viper.GetInt("database.backFill.maxIdle") + con.MaxOpen = viper.GetInt("database.backFill.maxOpen") + con.MaxLifetime = viper.GetInt("database.backFill.maxLifetime") + case Serve: + viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS) + viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS) + viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME) + con.MaxIdle = viper.GetInt("database.server.maxIdle") + con.MaxOpen = viper.GetInt("database.server.maxOpen") + con.MaxLifetime = viper.GetInt("database.server.maxLifetime") + default: + } + return con +}