From fb6fdfcc135d64da92ecd00f63a85f278dfb367f Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 6 May 2020 16:16:56 -0500 Subject: [PATCH] separate db conneciton pools for each process --- .../00020_create_btc_header_cids_table.sql | 2 +- pkg/super_node/backfiller.go | 6 ++--- pkg/super_node/config.go | 24 ++++++++++++------- pkg/super_node/service.go | 10 ++++---- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/db/migrations/00020_create_btc_header_cids_table.sql b/db/migrations/00020_create_btc_header_cids_table.sql index 9d2c1553..04a6f65e 100644 --- a/db/migrations/00020_create_btc_header_cids_table.sql +++ b/db/migrations/00020_create_btc_header_cids_table.sql @@ -7,7 +7,7 @@ CREATE TABLE btc.header_cids ( cid TEXT NOT NULL, timestamp NUMERIC NOT NULL, bits BIGINT NOT NULL, - node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE, + node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE, UNIQUE (block_number, block_hash) ); diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 48a89110..712a0425 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -69,11 +69,11 @@ type BackFillService struct { // NewBackFillService returns a new BackFillInterface func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) { - publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) + publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.BackFillDBConn, settings.IPFSMode) if err != nil { return nil, err } - indexer, err := NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) + indexer, err := NewCIDIndexer(settings.Chain, settings.BackFillDBConn, settings.IPFSMode) if err != nil { return nil, err } @@ -81,7 +81,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert if err != nil { return nil, err } - retriever, err := NewCIDRetriever(settings.Chain, settings.DB) + retriever, err := NewCIDRetriever(settings.Chain, settings.BackFillDBConn) if err != nil { return nil, err } diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index 2bb83cc7..84582a80 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -53,21 +53,23 @@ type Config struct { Chain shared.ChainType IPFSPath string IPFSMode shared.IPFSMode - DB *postgres.DB DBConfig config.Database Quit chan bool // Server fields Serve bool + ServeDBConn *postgres.DB WSEndpoint string HTTPEndpoint string IPCEndpoint string // Sync params - Sync bool - Workers int - WSClient interface{} - NodeInfo core.Node + Sync bool + SyncDBConn *postgres.DB + Workers int + WSClient interface{} + NodeInfo core.Node // Backfiller params BackFill bool + BackFillDBConn *postgres.DB HTTPClient interface{} Frequency time.Duration BatchSize uint64 @@ -110,6 +112,8 @@ func NewSuperNodeConfig() (*Config, error) { } } + c.DBConfig.Init() + c.Sync = viper.GetBool("superNode.sync") if c.Sync { workers := viper.GetInt("superNode.workers") @@ -128,6 +132,8 @@ func NewSuperNodeConfig() (*Config, error) { btcWS := viper.GetString("bitcoin.wsPath") c.NodeInfo, c.WSClient = shared.GetBtcNodeAndClient(btcWS) } + syncDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.SyncDBConn = &syncDB } c.Serve = viper.GetBool("superNode.server") @@ -151,6 +157,8 @@ func NewSuperNodeConfig() (*Config, error) { httpPath = "127.0.0.1:8081" } c.HTTPEndpoint = httpPath + serveDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.ServeDBConn = &serveDB } c.BackFill = viper.GetBool("superNode.backFill") @@ -160,9 +168,6 @@ func NewSuperNodeConfig() (*Config, error) { } } - c.DBConfig.Init() - db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) - c.DB = &db c.Quit = make(chan bool) return c, nil @@ -209,5 +214,8 @@ func (c *Config) BackFillFields() error { c.BatchSize = uint64(viper.GetInt64("superNode.batchSize")) c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber")) c.ValidationLevel = viper.GetInt("superNode.validationLevel") + + backFillDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.BackFillDBConn = &backFillDB return nil } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index c952a2db..1f09ea50 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -109,11 +109,11 @@ func NewSuperNode(settings *Config) (SuperNode, error) { if err != nil { return nil, err } - sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) + sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.SyncDBConn, settings.IPFSMode) if err != nil { return nil, err } - sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) + sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.SyncDBConn, settings.IPFSMode) if err != nil { return nil, err } @@ -124,14 +124,15 @@ func NewSuperNode(settings *Config) (SuperNode, error) { } // If we are serving, initialize the needed interfaces if settings.Serve { - sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.DB) + sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.ServeDBConn) if err != nil { return nil, err } - sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) + sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.ServeDBConn, settings.IPFSMode) if err != nil { return nil, err } + sn.db = settings.ServeDBConn } sn.QuitChan = settings.Quit sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) @@ -140,7 +141,6 @@ func NewSuperNode(settings *Config) (SuperNode, error) { sn.NodeInfo = &settings.NodeInfo sn.ipfsPath = settings.IPFSPath sn.chain = settings.Chain - sn.db = settings.DB return sn, nil }