From e7225317d4a082532e6d5781e6ca98219a0952f6 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 17 Mar 2020 08:37:00 -0500 Subject: [PATCH] split btc and eth super node processes into serpate containers; dockerfile for Postgraphile with plugins --- cmd/superNode.go | 44 +++--- dockerfiles/postgraphile/Dockerfile | 8 + dockerfiles/super_node/Dockerfile | 4 - dockerfiles/super_node/docker-compose.yml | 61 ++++++-- pkg/super_node/config.go | 170 +++++++++++----------- pkg/super_node/resync/config.go | 18 +-- 6 files changed, 174 insertions(+), 131 deletions(-) create mode 100644 dockerfiles/postgraphile/Dockerfile diff --git a/cmd/superNode.go b/cmd/superNode.go index d66986a7..93f1cbc9 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -54,7 +54,7 @@ func init() { } func superNode() { - superNodeConfigs, err := super_node.NewSuperNodeConfigs() + superNodeConfig, err := super_node.NewSuperNodeConfig() if err != nil { logWithCommand.Fatal(err) } @@ -62,31 +62,29 @@ func superNode() { logWithCommand.Fatal(err) } wg := &sync.WaitGroup{} - for _, superNodeConfig := range superNodeConfigs { - superNode, err := super_node.NewSuperNode(superNodeConfig) + superNode, err := super_node.NewSuperNode(superNodeConfig) + if err != nil { + logWithCommand.Fatal(err) + } + var forwardPayloadChan chan shared.ConvertedData + if superNodeConfig.Serve { + forwardPayloadChan = make(chan shared.ConvertedData, super_node.PayloadChanBufferSize) + superNode.FilterAndServe(wg, forwardPayloadChan) + if err := startServers(superNode, superNodeConfig); err != nil { + logWithCommand.Fatal(err) + } + } + if superNodeConfig.Sync { + if err := superNode.ProcessData(wg, forwardPayloadChan); err != nil { + logWithCommand.Fatal(err) + } + } + if superNodeConfig.BackFill { + backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan) if err != nil { logWithCommand.Fatal(err) } - var forwardPayloadChan chan shared.ConvertedData - if superNodeConfig.Serve { - forwardPayloadChan = make(chan shared.ConvertedData, super_node.PayloadChanBufferSize) - superNode.FilterAndServe(wg, forwardPayloadChan) - if err := startServers(superNode, superNodeConfig); err != nil { - logWithCommand.Fatal(err) - } - } - if superNodeConfig.Sync { - if err := superNode.ProcessData(wg, forwardPayloadChan); err != nil { - logWithCommand.Fatal(err) - } - } - if superNodeConfig.BackFill { - backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan) - if err != nil { - logWithCommand.Fatal(err) - } - backFiller.FillGapsInSuperNode(wg) - } + backFiller.FillGapsInSuperNode(wg) } wg.Wait() } diff --git a/dockerfiles/postgraphile/Dockerfile b/dockerfiles/postgraphile/Dockerfile new file mode 100644 index 00000000..46329cf8 --- /dev/null +++ b/dockerfiles/postgraphile/Dockerfile @@ -0,0 +1,8 @@ +FROM node:alpine + +RUN npm install -g postgraphile +RUN npm install -g postgraphile-plugin-connection-filter +RUN npm install -g @graphile/pg-pubsub + +EXPOSE 5000 +ENTRYPOINT ["postgraphile", "-n", "0.0.0.0"] \ No newline at end of file diff --git a/dockerfiles/super_node/Dockerfile b/dockerfiles/super_node/Dockerfile index deb1a26a..af3d4f27 100644 --- a/dockerfiles/super_node/Dockerfile +++ b/dockerfiles/super_node/Dockerfile @@ -34,8 +34,6 @@ ARG USER ARG CONFIG_FILE ARG EXPOSE_PORT_1 ARG EXPOSE_PORT_2 -ARG EXPOSE_PORT_3 -ARG EXPOSE_PORT_4 RUN adduser -Du 5000 $USER WORKDIR /app @@ -57,7 +55,5 @@ COPY --from=builder /go/src/github.com/ipfs/go-ipfs/ipfs ipfs EXPOSE $EXPOSE_PORT_1 EXPOSE $EXPOSE_PORT_2 -EXPOSE $EXPOSE_PORT_3 -EXPOSE $EXPOSE_PORT_4 CMD ["./startup_script.sh"] diff --git a/dockerfiles/super_node/docker-compose.yml b/dockerfiles/super_node/docker-compose.yml index e842cd97..126f0861 100644 --- a/dockerfiles/super_node/docker-compose.yml +++ b/dockerfiles/super_node/docker-compose.yml @@ -2,6 +2,7 @@ version: '3.2' services: db: + restart: always image: postgres:10.12-alpine environment: POSTGRES_USER: "postgres" @@ -13,13 +14,8 @@ services: - "5432" ports: - "127.0.0.1:8079:5432" - healthcheck: - test: ["CMD-SHELL", "pg_isready -U postgres"] - interval: 5s - timeout: 5s - retries: 30 - supernode: + btc: depends_on: - db build: @@ -29,7 +25,29 @@ services: - golang:1.12.4 dockerfile: ./dockerfiles/super_node/Dockerfile args: - CONFIG_FILE: ./environments/superNode.toml + CONFIG_FILE: ./environments/superNodeBTC.toml + environment: + IPFS_INIT: "true" + VDB_PG_NAME: "vulcanize_public" + VDB_PG_HOSTNAME: "db" + VDB_PG_PORT: 5432 + VDB_PG_USER: "postgres" + VDB_PG_PASSWORD: "password" + ports: + - "127.0.0.1:8082:8082" + - "127.0.0.1:8083:8083" + + eth: + depends_on: + - db + build: + context: ./../../ + cache_from: + - alpine:latest + - golang:1.12.4 + dockerfile: ./dockerfiles/super_node/Dockerfile + args: + CONFIG_FILE: ./environments/superNodeETH.toml environment: IPFS_INIT: "true" VDB_PG_NAME: "vulcanize_public" @@ -40,9 +58,30 @@ services: ports: - "127.0.0.1:8080:8080" - "127.0.0.1:8081:8081" - - "127.0.0.1:8082:8082" - - "127.0.0.1:8083:8083" + + graphql: + restart: always + depends_on: + - db + build: + context: ./../../ + cache_from: + - node:alpine + dockerfile: ./dockerfiles/postgraphile/Dockerfile + environment: + DATABASE_URL: postgres://postgres:password@db:5432/vulcanize_public + expose: + - "5000" + ports: + - "127.0.0.1:5000:5000" + command: ["--plugins", "@graphile/pg-pubsub", + "--subscriptions", + "--simple-subscriptions", + "--connection", $DATABASE_URL, + "--host", "0.0.0.0", + "--port", "5000", + "--schema", "public,btc,eth" + "--append-plugins", "postgraphile-plugin-connection-filter"] volumes: - vulcanizedb_db_data: - + vulcanizedb_db_data: \ No newline at end of file diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index 119c4827..2e2f76de 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -17,7 +17,6 @@ package super_node import ( - "fmt" "os" "path/filepath" "time" @@ -60,12 +59,18 @@ type Config struct { BatchSize uint64 } -// NewSuperNodeConfigs is used to initialize multiple SuperNode configs from a single config .toml file -// Separate chain supernode instances need to be ran in the same process in order to avoid lock contention on the ipfs repository -func NewSuperNodeConfigs() ([]*Config, error) { - chains := viper.GetStringSlice("superNode.chains") - configs := make([]*Config, len(chains)) +// NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file +// Separate chain supernode instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile +func NewSuperNodeConfig() (*Config, error) { + c := new(Config) var err error + + chain := viper.GetString("superNode.chain") + c.Chain, err = shared.NewChainType(chain) + if err != nil { + return nil, err + } + ipfsPath := viper.GetString("superNode.ipfsPath") if ipfsPath == "" { home, err := os.UserHomeDir() @@ -74,83 +79,80 @@ func NewSuperNodeConfigs() ([]*Config, error) { } ipfsPath = filepath.Join(home, ".ipfs") } - for i, chain := range chains { - sn := new(Config) - sn.Chain, err = shared.NewChainType(chain) - if err != nil { - return nil, err + c.IPFSPath = ipfsPath + + c.Sync = viper.GetBool("superNode.sync") + if c.Sync { + workers := viper.GetInt("superNode.workers") + if workers < 1 { + workers = 1 } - sn.DBConfig = config.Database{ - Name: viper.GetString(fmt.Sprintf("superNode.%s.database.name", chain)), - Hostname: viper.GetString(fmt.Sprintf("superNode.%s.database.hostname", chain)), - Port: viper.GetInt(fmt.Sprintf("superNode.%s.database.port", chain)), - User: viper.GetString(fmt.Sprintf("superNode.%s.database.user", chain)), - Password: viper.GetString(fmt.Sprintf("superNode.%s.database.password", chain)), - } - sn.IPFSPath = ipfsPath - sn.Serve = viper.GetBool(fmt.Sprintf("superNode.%s.server.on", chain)) - sn.Sync = viper.GetBool(fmt.Sprintf("superNode.%s.sync.on", chain)) - if sn.Sync { - workers := viper.GetInt("superNode.sync.workers") - if workers < 1 { - workers = 1 - } - sn.Workers = workers - switch sn.Chain { - case shared.Ethereum: - sn.NodeInfo, sn.WSClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.sync.wsPath")) - if err != nil { - return nil, err - } - case shared.Bitcoin: - sn.NodeInfo = core.Node{ - ID: viper.GetString("superNode.bitcoin.node.nodeID"), - ClientName: viper.GetString("superNode.bitcoin.node.clientName"), - GenesisBlock: viper.GetString("superNode.bitcoin.node.genesisBlock"), - NetworkID: viper.GetString("superNode.bitcoin.node.networkID"), - } - // For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node - sn.WSClient = &rpcclient.ConnConfig{ - Host: viper.GetString("superNode.bitcoin.sync.wsPath"), - HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode - DisableTLS: true, // Bitcoin core does not provide TLS by default - Pass: viper.GetString("superNode.bitcoin.sync.pass"), - User: viper.GetString("superNode.bitcoin.sync.user"), - } - } - } - if sn.Serve { - wsPath := viper.GetString(fmt.Sprintf("superNode.%s.server.wsPath", chain)) - if wsPath == "" { - wsPath = "ws://127.0.0.1:8546" - } - sn.WSEndpoint = wsPath - ipcPath := viper.GetString(fmt.Sprintf("superNode.%s.server.ipcPath", chain)) - if ipcPath == "" { - home, err := os.UserHomeDir() - if err != nil { - return nil, err - } - ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") - } - sn.IPCEndpoint = ipcPath - httpPath := viper.GetString(fmt.Sprintf("superNode.%s.server.httpPath", chain)) - if httpPath == "" { - httpPath = "http://127.0.0.1:8545" - } - sn.HTTPEndpoint = httpPath - } - db := utils.LoadPostgres(sn.DBConfig, sn.NodeInfo) - sn.DB = &db - sn.Quit = make(chan bool) - if viper.GetBool(fmt.Sprintf("superNode.%s.backFill.on", chain)) { - if err := sn.BackFillFields(chain); err != nil { + c.Workers = workers + switch c.Chain { + case shared.Ethereum: + c.NodeInfo, c.WSClient, err = getEthNodeAndClient(viper.GetString("ethereum.wsPath")) + if err != nil { return nil, err } + case shared.Bitcoin: + c.NodeInfo = core.Node{ + ID: viper.GetString("bitcoin.nodeID"), + ClientName: viper.GetString("bitcoin.clientName"), + GenesisBlock: viper.GetString("bitcoin.genesisBlock"), + NetworkID: viper.GetString("bitcoin.networkID"), + } + // For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node + c.WSClient = &rpcclient.ConnConfig{ + Host: viper.GetString("bitcoin.wsPath"), + HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode + DisableTLS: true, // Bitcoin core does not provide TLS by default + Pass: viper.GetString("bitcoin.pass"), + User: viper.GetString("bitcoin.user"), + } } - configs[i] = sn } - return configs, nil + + c.Serve = viper.GetBool("superNode.server") + if c.Serve { + wsPath := viper.GetString("superNode.wsPath") + if wsPath == "" { + wsPath = "ws://127.0.0.1:8546" + } + c.WSEndpoint = wsPath + ipcPath := viper.GetString("superNode.ipcPath") + if ipcPath == "" { + home, err := os.UserHomeDir() + if err != nil { + return nil, err + } + ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") + } + c.IPCEndpoint = ipcPath + httpPath := viper.GetString("superNode.httpPath") + if httpPath == "" { + httpPath = "http://127.0.0.1:8545" + } + c.HTTPEndpoint = httpPath + } + + c.DBConfig = config.Database{ + Name: viper.GetString("database.name"), + Hostname: viper.GetString("database.hostname"), + Port: viper.GetInt("database.port"), + User: viper.GetString("database.user"), + Password: viper.GetString("database.password"), + } + + db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.DB = &db + c.Quit = make(chan bool) + if viper.GetBool("superNode.backFill") { + if err := c.BackFillFields(chain); err != nil { + return nil, err + } + } + + return c, nil } // BackFillFields is used to fill in the BackFill fields of the config @@ -160,21 +162,21 @@ func (sn *Config) BackFillFields(chain string) error { var err error switch sn.Chain { case shared.Ethereum: - _, httpClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.backFill.httpPath")) + _, httpClient, err = getEthNodeAndClient(viper.GetString("ethereum.httpPath")) if err != nil { return err } case shared.Bitcoin: httpClient = &rpcclient.ConnConfig{ - Host: viper.GetString("superNode.bitcoin.backFill.httpPath"), + Host: viper.GetString("bitcoin.httpPath"), HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode DisableTLS: true, // Bitcoin core does not provide TLS by default - Pass: viper.GetString("superNode.bitcoin.backFill.pass"), - User: viper.GetString("superNode.bitcoin.backFill.user"), + Pass: viper.GetString("bitcoin.pass"), + User: viper.GetString("bitcoin.user"), } } sn.HTTPClient = httpClient - freq := viper.GetInt(fmt.Sprintf("superNode.%s.backFill.frequency", chain)) + freq := viper.GetInt("superNode.frequency") var frequency time.Duration if freq <= 0 { frequency = time.Second * 30 @@ -182,7 +184,7 @@ func (sn *Config) BackFillFields(chain string) error { frequency = time.Second * time.Duration(freq) } sn.Frequency = frequency - sn.BatchSize = uint64(viper.GetInt64(fmt.Sprintf("superNode.%s.backFill.batchSize", chain))) + sn.BatchSize = uint64(viper.GetInt64("superNode.batchSize")) return nil } diff --git a/pkg/super_node/resync/config.go b/pkg/super_node/resync/config.go index 1fb9661a..7dcb49ab 100644 --- a/pkg/super_node/resync/config.go +++ b/pkg/super_node/resync/config.go @@ -57,7 +57,7 @@ type Config struct { func NewReSyncConfig() (*Config, error) { c := new(Config) var err error - ipfsPath := viper.GetString("superNode.ipfsPath") + ipfsPath := viper.GetString("resync.ipfsPath") if ipfsPath == "" { home, err := os.UserHomeDir() if err != nil { @@ -94,24 +94,24 @@ func NewReSyncConfig() (*Config, error) { switch c.Chain { case shared.Ethereum: - c.NodeInfo, c.HTTPClient, err = getEthNodeAndClient(viper.GetString("ethereum.node.httpPath")) + c.NodeInfo, c.HTTPClient, err = getEthNodeAndClient(viper.GetString("ethereum.httpPath")) if err != nil { return nil, err } case shared.Bitcoin: c.NodeInfo = core.Node{ - ID: viper.GetString("bitcoin.node.nodeID"), - ClientName: viper.GetString("bitcoin.node.clientName"), - GenesisBlock: viper.GetString("bitcoin.node.genesisBlock"), - NetworkID: viper.GetString("bitcoin.node.networkID"), + ID: viper.GetString("bitcoin.nodeID"), + ClientName: viper.GetString("bitcoin.clientName"), + GenesisBlock: viper.GetString("bitcoin.genesisBlock"), + NetworkID: viper.GetString("bitcoin.networkID"), } // For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node c.HTTPClient = &rpcclient.ConnConfig{ - Host: viper.GetString("bitcoin.node.httpPath"), + Host: viper.GetString("bitcoin.httpPath"), HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode DisableTLS: true, // Bitcoin core does not provide TLS by default - Pass: viper.GetString("bitcoin.node.pass"), - User: viper.GetString("bitcoin.node.user"), + Pass: viper.GetString("bitcoin.pass"), + User: viper.GetString("bitcoin.user"), } } db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)