integrate into serve command

This commit is contained in:
Ian Norden 2020-10-27 14:08:10 -05:00
parent a8dd77294a
commit 16aa9652a5
3 changed files with 52 additions and 21 deletions

View File

@ -20,6 +20,8 @@ import (
"os/signal"
"sync"
"github.com/vulcanize/ipld-eth-server/pkg/graphql"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -28,7 +30,6 @@ import (
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
v "github.com/vulcanize/ipld-eth-server/version"
)
@ -91,14 +92,37 @@ func startServers(server s.Server, settings *s.Config) error {
}
logWithCommand.Info("starting up HTTP server")
_, _, err = srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
if err != nil {
return err
}
return startGraphQL(server)
}
return err
func startGraphQL(server s.Server) error {
viper.BindEnv("server.graphql", "SERVER_GRAPHQL")
if viper.GetBool("server.graphql") {
logWithCommand.Info("starting up GraphQL server")
viper.BindEnv("server.graphqlEndpoint", "SERVER_GRAPHQL_ENDPOINT")
endPoint := viper.GetString("server.graphqlEndpoint")
if endPoint != "" {
graphQLServer, err := graphql.New(server.Backend(), endPoint, nil, []string{"*"}, rpc.HTTPTimeouts{})
if err != nil {
return err
}
if err := graphQLServer.Start(nil); err != nil {
return err
}
}
}
return nil
}
func init() {
rootCmd.AddCommand(serveCmd)
// flags for all config variables
serveCmd.PersistentFlags().Bool("server-graphql", false, "turn on the graphql server")
serveCmd.PersistentFlags().String("server-graphql-endpoint", "", "endpoint url for graphql server")
serveCmd.PersistentFlags().String("server-ws-path", "", "vdb server ws path")
serveCmd.PersistentFlags().String("server-http-path", "", "vdb server http path")
serveCmd.PersistentFlags().String("server-ipc-path", "", "vdb server ipc path")
@ -113,6 +137,8 @@ func init() {
serveCmd.PersistentFlags().String("eth-rpc-gas-cap", "", "rpc gas cap (for eth_Call execution)")
// and their bindings
viper.BindPFlag("server.graphql", serveCmd.PersistentFlags().Lookup("server-graphql"))
viper.BindPFlag("server.graphqlEndpoint", serveCmd.PersistentFlags().Lookup("server-graphql-endpoint"))
viper.BindPFlag("server.wsPath", serveCmd.PersistentFlags().Lookup("server-ws-path"))
viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("server-http-path"))
viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("server-ipc-path"))

View File

@ -12,6 +12,8 @@
ipcPath = "~/.vulcanize/vulcanize.ipc" # $SERVER_IPC_PATH
wsPath = "127.0.0.1:8081" # $SERVER_WS_PATH
httpPath = "127.0.0.1:8082" # $SERVER_HTTP_PATH
graphql = true # $SERVER_GRAPHQL
graphqlEndpoint = "127.0.0.1:8083" # $SERVER_GRAPHQL_ENDPOINT
[ethereum]
chainID = "1" # $ETH_CHAIN_ID

View File

@ -52,6 +52,8 @@ type Server interface {
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings)
// Method to unsubscribe from the service
Unsubscribe(id rpc.ID)
// Backend exposes the server's backend
Backend() *eth.Backend
}
// Service is the underlying struct for the watcher
@ -74,29 +76,30 @@ type Service struct {
db *postgres.DB
// wg for syncing serve processes
serveWg *sync.WaitGroup
// config for backend
config *eth.Config
// rpc client for forwarding cache misses
client *rpc.Client
// backend for the server
backend *eth.Backend
}
// NewServer creates a new Server using an underlying Service struct
func NewServer(settings *Config) (Server, error) {
sn := new(Service)
sn.Retriever = eth.NewCIDRetriever(settings.DB)
sn.IPLDFetcher = eth.NewIPLDFetcher(settings.DB)
sn.Filterer = eth.NewResponseFilterer()
sn.db = settings.DB
sn.QuitChan = make(chan bool)
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
sn.SubscriptionTypes = make(map[common.Hash]eth.SubscriptionSettings)
sn.config = &eth.Config{
sap := new(Service)
sap.Retriever = eth.NewCIDRetriever(settings.DB)
sap.IPLDFetcher = eth.NewIPLDFetcher(settings.DB)
sap.Filterer = eth.NewResponseFilterer()
sap.db = settings.DB
sap.QuitChan = make(chan bool)
sap.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
sap.SubscriptionTypes = make(map[common.Hash]eth.SubscriptionSettings)
var err error
sap.backend, err = eth.NewEthBackend(sap.db, &eth.Config{
ChainConfig: settings.ChainConfig,
VmConfig: vm.Config{},
DefaultSender: settings.DefaultSender,
RPCGasCap: settings.RPCGasCap,
}
return sn, nil
})
return sap, err
}
// Protocols exports the services p2p protocols, this service has none
@ -133,15 +136,10 @@ func (sap *Service) APIs() []rpc.API {
Public: true,
},
}
backend, err := eth.NewEthBackend(sap.db, sap.config)
if err != nil {
log.Error(err)
return nil
}
return append(apis, rpc.API{
Namespace: eth.APIName,
Version: eth.APIVersion,
Service: eth.NewPublicEthAPI(backend, sap.client),
Service: eth.NewPublicEthAPI(sap.backend, sap.client),
Public: true,
})
}
@ -358,6 +356,11 @@ func (sap *Service) Stop() error {
return nil
}
// Backend exposes the server's backend
func (sap *Service) Backend() *eth.Backend {
return sap.backend
}
// close is used to close all listening subscriptions
// close needs to be called with subscription access locked
func (sap *Service) close() {