split btc and eth super node processes into serpate containers; dockerfile for Postgraphile with plugins

This commit is contained in:
Ian Norden 2020-03-17 08:37:00 -05:00
parent 93e7eb35c5
commit e7225317d4
6 changed files with 174 additions and 131 deletions

View File

@ -54,7 +54,7 @@ func init() {
}
func superNode() {
superNodeConfigs, err := super_node.NewSuperNodeConfigs()
superNodeConfig, err := super_node.NewSuperNodeConfig()
if err != nil {
logWithCommand.Fatal(err)
}
@ -62,31 +62,29 @@ func superNode() {
logWithCommand.Fatal(err)
}
wg := &sync.WaitGroup{}
for _, superNodeConfig := range superNodeConfigs {
superNode, err := super_node.NewSuperNode(superNodeConfig)
superNode, err := super_node.NewSuperNode(superNodeConfig)
if err != nil {
logWithCommand.Fatal(err)
}
var forwardPayloadChan chan shared.ConvertedData
if superNodeConfig.Serve {
forwardPayloadChan = make(chan shared.ConvertedData, super_node.PayloadChanBufferSize)
superNode.FilterAndServe(wg, forwardPayloadChan)
if err := startServers(superNode, superNodeConfig); err != nil {
logWithCommand.Fatal(err)
}
}
if superNodeConfig.Sync {
if err := superNode.ProcessData(wg, forwardPayloadChan); err != nil {
logWithCommand.Fatal(err)
}
}
if superNodeConfig.BackFill {
backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan)
if err != nil {
logWithCommand.Fatal(err)
}
var forwardPayloadChan chan shared.ConvertedData
if superNodeConfig.Serve {
forwardPayloadChan = make(chan shared.ConvertedData, super_node.PayloadChanBufferSize)
superNode.FilterAndServe(wg, forwardPayloadChan)
if err := startServers(superNode, superNodeConfig); err != nil {
logWithCommand.Fatal(err)
}
}
if superNodeConfig.Sync {
if err := superNode.ProcessData(wg, forwardPayloadChan); err != nil {
logWithCommand.Fatal(err)
}
}
if superNodeConfig.BackFill {
backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan)
if err != nil {
logWithCommand.Fatal(err)
}
backFiller.FillGapsInSuperNode(wg)
}
backFiller.FillGapsInSuperNode(wg)
}
wg.Wait()
}

View File

@ -0,0 +1,8 @@
FROM node:alpine
RUN npm install -g postgraphile
RUN npm install -g postgraphile-plugin-connection-filter
RUN npm install -g @graphile/pg-pubsub
EXPOSE 5000
ENTRYPOINT ["postgraphile", "-n", "0.0.0.0"]

View File

@ -34,8 +34,6 @@ ARG USER
ARG CONFIG_FILE
ARG EXPOSE_PORT_1
ARG EXPOSE_PORT_2
ARG EXPOSE_PORT_3
ARG EXPOSE_PORT_4
RUN adduser -Du 5000 $USER
WORKDIR /app
@ -57,7 +55,5 @@ COPY --from=builder /go/src/github.com/ipfs/go-ipfs/ipfs ipfs
EXPOSE $EXPOSE_PORT_1
EXPOSE $EXPOSE_PORT_2
EXPOSE $EXPOSE_PORT_3
EXPOSE $EXPOSE_PORT_4
CMD ["./startup_script.sh"]

View File

@ -2,6 +2,7 @@ version: '3.2'
services:
db:
restart: always
image: postgres:10.12-alpine
environment:
POSTGRES_USER: "postgres"
@ -13,13 +14,8 @@ services:
- "5432"
ports:
- "127.0.0.1:8079:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 30
supernode:
btc:
depends_on:
- db
build:
@ -29,7 +25,29 @@ services:
- golang:1.12.4
dockerfile: ./dockerfiles/super_node/Dockerfile
args:
CONFIG_FILE: ./environments/superNode.toml
CONFIG_FILE: ./environments/superNodeBTC.toml
environment:
IPFS_INIT: "true"
VDB_PG_NAME: "vulcanize_public"
VDB_PG_HOSTNAME: "db"
VDB_PG_PORT: 5432
VDB_PG_USER: "postgres"
VDB_PG_PASSWORD: "password"
ports:
- "127.0.0.1:8082:8082"
- "127.0.0.1:8083:8083"
eth:
depends_on:
- db
build:
context: ./../../
cache_from:
- alpine:latest
- golang:1.12.4
dockerfile: ./dockerfiles/super_node/Dockerfile
args:
CONFIG_FILE: ./environments/superNodeETH.toml
environment:
IPFS_INIT: "true"
VDB_PG_NAME: "vulcanize_public"
@ -40,9 +58,30 @@ services:
ports:
- "127.0.0.1:8080:8080"
- "127.0.0.1:8081:8081"
- "127.0.0.1:8082:8082"
- "127.0.0.1:8083:8083"
graphql:
restart: always
depends_on:
- db
build:
context: ./../../
cache_from:
- node:alpine
dockerfile: ./dockerfiles/postgraphile/Dockerfile
environment:
DATABASE_URL: postgres://postgres:password@db:5432/vulcanize_public
expose:
- "5000"
ports:
- "127.0.0.1:5000:5000"
command: ["--plugins", "@graphile/pg-pubsub",
"--subscriptions",
"--simple-subscriptions",
"--connection", $DATABASE_URL,
"--host", "0.0.0.0",
"--port", "5000",
"--schema", "public,btc,eth"
"--append-plugins", "postgraphile-plugin-connection-filter"]
volumes:
vulcanizedb_db_data:
vulcanizedb_db_data:

View File

@ -17,7 +17,6 @@
package super_node
import (
"fmt"
"os"
"path/filepath"
"time"
@ -60,12 +59,18 @@ type Config struct {
BatchSize uint64
}
// NewSuperNodeConfigs is used to initialize multiple SuperNode configs from a single config .toml file
// Separate chain supernode instances need to be ran in the same process in order to avoid lock contention on the ipfs repository
func NewSuperNodeConfigs() ([]*Config, error) {
chains := viper.GetStringSlice("superNode.chains")
configs := make([]*Config, len(chains))
// NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file
// Separate chain supernode instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile
func NewSuperNodeConfig() (*Config, error) {
c := new(Config)
var err error
chain := viper.GetString("superNode.chain")
c.Chain, err = shared.NewChainType(chain)
if err != nil {
return nil, err
}
ipfsPath := viper.GetString("superNode.ipfsPath")
if ipfsPath == "" {
home, err := os.UserHomeDir()
@ -74,83 +79,80 @@ func NewSuperNodeConfigs() ([]*Config, error) {
}
ipfsPath = filepath.Join(home, ".ipfs")
}
for i, chain := range chains {
sn := new(Config)
sn.Chain, err = shared.NewChainType(chain)
if err != nil {
return nil, err
c.IPFSPath = ipfsPath
c.Sync = viper.GetBool("superNode.sync")
if c.Sync {
workers := viper.GetInt("superNode.workers")
if workers < 1 {
workers = 1
}
sn.DBConfig = config.Database{
Name: viper.GetString(fmt.Sprintf("superNode.%s.database.name", chain)),
Hostname: viper.GetString(fmt.Sprintf("superNode.%s.database.hostname", chain)),
Port: viper.GetInt(fmt.Sprintf("superNode.%s.database.port", chain)),
User: viper.GetString(fmt.Sprintf("superNode.%s.database.user", chain)),
Password: viper.GetString(fmt.Sprintf("superNode.%s.database.password", chain)),
}
sn.IPFSPath = ipfsPath
sn.Serve = viper.GetBool(fmt.Sprintf("superNode.%s.server.on", chain))
sn.Sync = viper.GetBool(fmt.Sprintf("superNode.%s.sync.on", chain))
if sn.Sync {
workers := viper.GetInt("superNode.sync.workers")
if workers < 1 {
workers = 1
}
sn.Workers = workers
switch sn.Chain {
case shared.Ethereum:
sn.NodeInfo, sn.WSClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.sync.wsPath"))
if err != nil {
return nil, err
}
case shared.Bitcoin:
sn.NodeInfo = core.Node{
ID: viper.GetString("superNode.bitcoin.node.nodeID"),
ClientName: viper.GetString("superNode.bitcoin.node.clientName"),
GenesisBlock: viper.GetString("superNode.bitcoin.node.genesisBlock"),
NetworkID: viper.GetString("superNode.bitcoin.node.networkID"),
}
// For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node
sn.WSClient = &rpcclient.ConnConfig{
Host: viper.GetString("superNode.bitcoin.sync.wsPath"),
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default
Pass: viper.GetString("superNode.bitcoin.sync.pass"),
User: viper.GetString("superNode.bitcoin.sync.user"),
}
}
}
if sn.Serve {
wsPath := viper.GetString(fmt.Sprintf("superNode.%s.server.wsPath", chain))
if wsPath == "" {
wsPath = "ws://127.0.0.1:8546"
}
sn.WSEndpoint = wsPath
ipcPath := viper.GetString(fmt.Sprintf("superNode.%s.server.ipcPath", chain))
if ipcPath == "" {
home, err := os.UserHomeDir()
if err != nil {
return nil, err
}
ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc")
}
sn.IPCEndpoint = ipcPath
httpPath := viper.GetString(fmt.Sprintf("superNode.%s.server.httpPath", chain))
if httpPath == "" {
httpPath = "http://127.0.0.1:8545"
}
sn.HTTPEndpoint = httpPath
}
db := utils.LoadPostgres(sn.DBConfig, sn.NodeInfo)
sn.DB = &db
sn.Quit = make(chan bool)
if viper.GetBool(fmt.Sprintf("superNode.%s.backFill.on", chain)) {
if err := sn.BackFillFields(chain); err != nil {
c.Workers = workers
switch c.Chain {
case shared.Ethereum:
c.NodeInfo, c.WSClient, err = getEthNodeAndClient(viper.GetString("ethereum.wsPath"))
if err != nil {
return nil, err
}
case shared.Bitcoin:
c.NodeInfo = core.Node{
ID: viper.GetString("bitcoin.nodeID"),
ClientName: viper.GetString("bitcoin.clientName"),
GenesisBlock: viper.GetString("bitcoin.genesisBlock"),
NetworkID: viper.GetString("bitcoin.networkID"),
}
// For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node
c.WSClient = &rpcclient.ConnConfig{
Host: viper.GetString("bitcoin.wsPath"),
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default
Pass: viper.GetString("bitcoin.pass"),
User: viper.GetString("bitcoin.user"),
}
}
configs[i] = sn
}
return configs, nil
c.Serve = viper.GetBool("superNode.server")
if c.Serve {
wsPath := viper.GetString("superNode.wsPath")
if wsPath == "" {
wsPath = "ws://127.0.0.1:8546"
}
c.WSEndpoint = wsPath
ipcPath := viper.GetString("superNode.ipcPath")
if ipcPath == "" {
home, err := os.UserHomeDir()
if err != nil {
return nil, err
}
ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc")
}
c.IPCEndpoint = ipcPath
httpPath := viper.GetString("superNode.httpPath")
if httpPath == "" {
httpPath = "http://127.0.0.1:8545"
}
c.HTTPEndpoint = httpPath
}
c.DBConfig = config.Database{
Name: viper.GetString("database.name"),
Hostname: viper.GetString("database.hostname"),
Port: viper.GetInt("database.port"),
User: viper.GetString("database.user"),
Password: viper.GetString("database.password"),
}
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.DB = &db
c.Quit = make(chan bool)
if viper.GetBool("superNode.backFill") {
if err := c.BackFillFields(chain); err != nil {
return nil, err
}
}
return c, nil
}
// BackFillFields is used to fill in the BackFill fields of the config
@ -160,21 +162,21 @@ func (sn *Config) BackFillFields(chain string) error {
var err error
switch sn.Chain {
case shared.Ethereum:
_, httpClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.backFill.httpPath"))
_, httpClient, err = getEthNodeAndClient(viper.GetString("ethereum.httpPath"))
if err != nil {
return err
}
case shared.Bitcoin:
httpClient = &rpcclient.ConnConfig{
Host: viper.GetString("superNode.bitcoin.backFill.httpPath"),
Host: viper.GetString("bitcoin.httpPath"),
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default
Pass: viper.GetString("superNode.bitcoin.backFill.pass"),
User: viper.GetString("superNode.bitcoin.backFill.user"),
Pass: viper.GetString("bitcoin.pass"),
User: viper.GetString("bitcoin.user"),
}
}
sn.HTTPClient = httpClient
freq := viper.GetInt(fmt.Sprintf("superNode.%s.backFill.frequency", chain))
freq := viper.GetInt("superNode.frequency")
var frequency time.Duration
if freq <= 0 {
frequency = time.Second * 30
@ -182,7 +184,7 @@ func (sn *Config) BackFillFields(chain string) error {
frequency = time.Second * time.Duration(freq)
}
sn.Frequency = frequency
sn.BatchSize = uint64(viper.GetInt64(fmt.Sprintf("superNode.%s.backFill.batchSize", chain)))
sn.BatchSize = uint64(viper.GetInt64("superNode.batchSize"))
return nil
}

View File

@ -57,7 +57,7 @@ type Config struct {
func NewReSyncConfig() (*Config, error) {
c := new(Config)
var err error
ipfsPath := viper.GetString("superNode.ipfsPath")
ipfsPath := viper.GetString("resync.ipfsPath")
if ipfsPath == "" {
home, err := os.UserHomeDir()
if err != nil {
@ -94,24 +94,24 @@ func NewReSyncConfig() (*Config, error) {
switch c.Chain {
case shared.Ethereum:
c.NodeInfo, c.HTTPClient, err = getEthNodeAndClient(viper.GetString("ethereum.node.httpPath"))
c.NodeInfo, c.HTTPClient, err = getEthNodeAndClient(viper.GetString("ethereum.httpPath"))
if err != nil {
return nil, err
}
case shared.Bitcoin:
c.NodeInfo = core.Node{
ID: viper.GetString("bitcoin.node.nodeID"),
ClientName: viper.GetString("bitcoin.node.clientName"),
GenesisBlock: viper.GetString("bitcoin.node.genesisBlock"),
NetworkID: viper.GetString("bitcoin.node.networkID"),
ID: viper.GetString("bitcoin.nodeID"),
ClientName: viper.GetString("bitcoin.clientName"),
GenesisBlock: viper.GetString("bitcoin.genesisBlock"),
NetworkID: viper.GetString("bitcoin.networkID"),
}
// For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node
c.HTTPClient = &rpcclient.ConnConfig{
Host: viper.GetString("bitcoin.node.httpPath"),
Host: viper.GetString("bitcoin.httpPath"),
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default
Pass: viper.GetString("bitcoin.node.pass"),
User: viper.GetString("bitcoin.node.user"),
Pass: viper.GetString("bitcoin.pass"),
User: viper.GetString("bitcoin.user"),
}
}
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)