separate db conneciton pools for each process

This commit is contained in:
Ian Norden 2020-05-06 16:16:56 -05:00
parent fbd4a5ca6a
commit fb6fdfcc13
4 changed files with 25 additions and 17 deletions

View File

@ -7,7 +7,7 @@ CREATE TABLE btc.header_cids (
cid TEXT NOT NULL, cid TEXT NOT NULL,
timestamp NUMERIC NOT NULL, timestamp NUMERIC NOT NULL,
bits BIGINT NOT NULL, bits BIGINT NOT NULL,
node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE, node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE,
UNIQUE (block_number, block_hash) UNIQUE (block_number, block_hash)
); );

View File

@ -69,11 +69,11 @@ type BackFillService struct {
// NewBackFillService returns a new BackFillInterface // NewBackFillService returns a new BackFillInterface
func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) { func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) {
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.BackFillDBConn, settings.IPFSMode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
indexer, err := NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) indexer, err := NewCIDIndexer(settings.Chain, settings.BackFillDBConn, settings.IPFSMode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -81,7 +81,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
if err != nil { if err != nil {
return nil, err return nil, err
} }
retriever, err := NewCIDRetriever(settings.Chain, settings.DB) retriever, err := NewCIDRetriever(settings.Chain, settings.BackFillDBConn)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -53,21 +53,23 @@ type Config struct {
Chain shared.ChainType Chain shared.ChainType
IPFSPath string IPFSPath string
IPFSMode shared.IPFSMode IPFSMode shared.IPFSMode
DB *postgres.DB
DBConfig config.Database DBConfig config.Database
Quit chan bool Quit chan bool
// Server fields // Server fields
Serve bool Serve bool
ServeDBConn *postgres.DB
WSEndpoint string WSEndpoint string
HTTPEndpoint string HTTPEndpoint string
IPCEndpoint string IPCEndpoint string
// Sync params // Sync params
Sync bool Sync bool
Workers int SyncDBConn *postgres.DB
WSClient interface{} Workers int
NodeInfo core.Node WSClient interface{}
NodeInfo core.Node
// Backfiller params // Backfiller params
BackFill bool BackFill bool
BackFillDBConn *postgres.DB
HTTPClient interface{} HTTPClient interface{}
Frequency time.Duration Frequency time.Duration
BatchSize uint64 BatchSize uint64
@ -110,6 +112,8 @@ func NewSuperNodeConfig() (*Config, error) {
} }
} }
c.DBConfig.Init()
c.Sync = viper.GetBool("superNode.sync") c.Sync = viper.GetBool("superNode.sync")
if c.Sync { if c.Sync {
workers := viper.GetInt("superNode.workers") workers := viper.GetInt("superNode.workers")
@ -128,6 +132,8 @@ func NewSuperNodeConfig() (*Config, error) {
btcWS := viper.GetString("bitcoin.wsPath") btcWS := viper.GetString("bitcoin.wsPath")
c.NodeInfo, c.WSClient = shared.GetBtcNodeAndClient(btcWS) c.NodeInfo, c.WSClient = shared.GetBtcNodeAndClient(btcWS)
} }
syncDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.SyncDBConn = &syncDB
} }
c.Serve = viper.GetBool("superNode.server") c.Serve = viper.GetBool("superNode.server")
@ -151,6 +157,8 @@ func NewSuperNodeConfig() (*Config, error) {
httpPath = "127.0.0.1:8081" httpPath = "127.0.0.1:8081"
} }
c.HTTPEndpoint = httpPath c.HTTPEndpoint = httpPath
serveDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.ServeDBConn = &serveDB
} }
c.BackFill = viper.GetBool("superNode.backFill") c.BackFill = viper.GetBool("superNode.backFill")
@ -160,9 +168,6 @@ func NewSuperNodeConfig() (*Config, error) {
} }
} }
c.DBConfig.Init()
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.DB = &db
c.Quit = make(chan bool) c.Quit = make(chan bool)
return c, nil return c, nil
@ -209,5 +214,8 @@ func (c *Config) BackFillFields() error {
c.BatchSize = uint64(viper.GetInt64("superNode.batchSize")) c.BatchSize = uint64(viper.GetInt64("superNode.batchSize"))
c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber")) c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber"))
c.ValidationLevel = viper.GetInt("superNode.validationLevel") c.ValidationLevel = viper.GetInt("superNode.validationLevel")
backFillDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.BackFillDBConn = &backFillDB
return nil return nil
} }

View File

@ -109,11 +109,11 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.SyncDBConn, settings.IPFSMode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.SyncDBConn, settings.IPFSMode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -124,14 +124,15 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
} }
// If we are serving, initialize the needed interfaces // If we are serving, initialize the needed interfaces
if settings.Serve { if settings.Serve {
sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.DB) sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.ServeDBConn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.ServeDBConn, settings.IPFSMode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sn.db = settings.ServeDBConn
} }
sn.QuitChan = settings.Quit sn.QuitChan = settings.Quit
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
@ -140,7 +141,6 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
sn.NodeInfo = &settings.NodeInfo sn.NodeInfo = &settings.NodeInfo
sn.ipfsPath = settings.IPFSPath sn.ipfsPath = settings.IPFSPath
sn.chain = settings.Chain sn.chain = settings.Chain
sn.db = settings.DB
return sn, nil return sn, nil
} }