From 02ca71cfa92a1c3c8c573e64e209d254c5e3d5c6 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 30 Dec 2021 15:55:55 -0600 Subject: [PATCH] RPC server RPC utils --- pkg/rpc/http.go | 47 +++++++++++++++++++++++++ pkg/rpc/ipc.go | 91 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 pkg/rpc/http.go create mode 100644 pkg/rpc/ipc.go diff --git a/pkg/rpc/http.go b/pkg/rpc/http.go new file mode 100644 index 0000000..41b5057 --- /dev/null +++ b/pkg/rpc/http.go @@ -0,0 +1,47 @@ +// VulcanizeDB +// Copyright © 2022 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package rpc + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/cmd/utils" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" +) + +// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules. +func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (*rpc.Server, error) { + + srv := rpc.NewServer() + err := node.RegisterApis(apis, modules, srv, false) + if err != nil { + utils.Fatalf("Could not register HTTP API: %w", err) + } + handler := node.NewHTTPHandlerStack(srv, cors, vhosts) + + // start http server + _, addr, err := node.StartHTTPEndpoint(endpoint, rpc.DefaultHTTPTimeouts, handler) + if err != nil { + utils.Fatalf("Could not start RPC api: %v", err) + } + extapiURL := fmt.Sprintf("http://%v/", addr) + log.Infof("HTTP endpoint opened %s", extapiURL) + + return srv, err +} diff --git a/pkg/rpc/ipc.go b/pkg/rpc/ipc.go new file mode 100644 index 0000000..146abde --- /dev/null +++ b/pkg/rpc/ipc.go @@ -0,0 +1,91 @@ +// VulcanizeDB +// Copyright © 2022 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package rpc + +import ( + "fmt" + "net" + "os" + "path/filepath" + + "github.com/ethereum/go-ethereum/p2p/netutil" + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-server/pkg/prom" +) + +var ( + // On Linux, sun_path is 108 bytes in size + // see http://man7.org/linux/man-pages/man7/unix.7.html + maxPathSize = 108 +) + +// ipcListen will create a Unix socket on the given endpoint. +func ipcListen(endpoint string) (net.Listener, error) { + if len(endpoint) > int(maxPathSize) { + log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", maxPathSize), + "endpoint", endpoint) + } + + // Ensure the IPC path exists and remove any previous leftover + if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil { + return nil, err + } + os.Remove(endpoint) + l, err := net.Listen("unix", endpoint) + if err != nil { + return nil, err + } + os.Chmod(endpoint, 0600) + return l, nil +} + +func ipcServe(srv *rpc.Server, listener net.Listener) { + for { + conn, err := listener.Accept() + if netutil.IsTemporaryError(err) { + log.WithError(err).Warn("rpc accept error") + continue + } + if err != nil { + log.WithError(err).Warn("unknown error") + continue + } + log.WithField("addr", conn.RemoteAddr()).Trace("accepted ipc connection") + go prom.IPCMiddleware(srv, conn) + } +} + +// StartIPCEndpoint starts an IPC endpoint. +func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Server, error) { + // Register all the APIs exposed by the services. + handler := rpc.NewServer() + for _, api := range apis { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("IPC registered", "namespace", api.Namespace) + } + // All APIs registered, start the IPC listener. + listener, err := ipcListen(ipcEndpoint) + if err != nil { + return nil, nil, err + } + + go ipcServe(handler, listener) + return listener, handler, nil +}