From 48fb5bcd27318575a8c8fb091ba5dc4c9880e9f1 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 13 Feb 2020 16:50:56 -0600 Subject: [PATCH] separate super node instances for different chains need to be within the same process to avoid contention over ipfs repo lock --- cmd/superNode.go | 61 +++++----- dockerfiles/super_node/Dockerfile | 4 + documentation/super_node/setup.md | 57 +++++++-- environments/btcSuperNode.toml | 36 ------ environments/ethSuperNode.toml | 26 ----- environments/superNode.toml | 59 ++++++++++ pkg/super_node/backfiller.go | 36 +++--- pkg/super_node/backfiller_test.go | 15 ++- pkg/super_node/eth/mocks/converter.go | 2 +- pkg/super_node/service.go | 46 ++------ pkg/super_node/service_test.go | 2 +- pkg/super_node/shared/chain_type.go | 13 +++ pkg/super_node/shared/config.go | 161 ++++++++++++++------------ 13 files changed, 278 insertions(+), 240 deletions(-) delete mode 100644 environments/btcSuperNode.toml delete mode 100644 environments/ethSuperNode.toml create mode 100644 environments/superNode.toml diff --git a/cmd/superNode.go b/cmd/superNode.go index 2d01fe6e..0ba43996 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -18,6 +18,8 @@ package cmd import ( "sync" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -53,48 +55,43 @@ func init() { } func superNode() { - superNode, superNodeConfig, err := newSuperNode() + superNodeConfigs, err := shared.NewSuperNodeConfigs() if err != nil { logWithCommand.Fatal(err) } + if err := ipfs.InitIPFSPlugins(); err != nil { + logWithCommand.Fatal(err) + } wg := &sync.WaitGroup{} - var forwardQuitChan chan bool - var forwardPayloadChan chan shared.StreamedIPLDs - if superNodeConfig.Serve { - forwardQuitChan = make(chan bool) - forwardPayloadChan = make(chan shared.StreamedIPLDs, super_node.PayloadChanBufferSize) - superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) - if err := startServers(superNode, superNodeConfig); err != nil { - logWithCommand.Fatal(err) - } - } - if superNodeConfig.Sync { - if err := superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan); err != nil { - logWithCommand.Fatal(err) - } - } - if superNodeConfig.BackFill { - backFiller, err := super_node.NewBackFillService(superNodeConfig) + for _, superNodeConfig := range superNodeConfigs { + superNode, err := super_node.NewSuperNode(superNodeConfig) if err != nil { logWithCommand.Fatal(err) } - backFiller.FillGaps(wg, nil) + var forwardPayloadChan chan shared.StreamedIPLDs + if superNodeConfig.Serve { + forwardPayloadChan = make(chan shared.StreamedIPLDs, super_node.PayloadChanBufferSize) + superNode.ScreenAndServe(wg, forwardPayloadChan) + if err := startServers(superNode, superNodeConfig); err != nil { + logWithCommand.Fatal(err) + } + } + if superNodeConfig.Sync { + if err := superNode.SyncAndPublish(wg, forwardPayloadChan); err != nil { + logWithCommand.Fatal(err) + } + } + if superNodeConfig.BackFill { + backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan) + if err != nil { + logWithCommand.Fatal(err) + } + backFiller.FillGaps(wg) + } } wg.Wait() } -func newSuperNode() (super_node.SuperNode, *shared.SuperNodeConfig, error) { - superNodeConfig, err := shared.NewSuperNodeConfig() - if err != nil { - return nil, nil, err - } - sn, err := super_node.NewSuperNode(superNodeConfig) - if err != nil { - return nil, nil, err - } - return sn, superNodeConfig, nil -} - func startServers(superNode super_node.SuperNode, settings *shared.SuperNodeConfig) error { _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs()) if err != nil { @@ -104,6 +101,6 @@ func startServers(superNode super_node.SuperNode, settings *shared.SuperNodeConf if err != nil { return err } - _, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{"eth", "btc"}, nil, nil, rpc.HTTPTimeouts{}) + _, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{settings.Chain.API()}, nil, nil, rpc.HTTPTimeouts{}) return err } diff --git a/dockerfiles/super_node/Dockerfile b/dockerfiles/super_node/Dockerfile index 2c9dd0a7..06b55a83 100644 --- a/dockerfiles/super_node/Dockerfile +++ b/dockerfiles/super_node/Dockerfile @@ -37,6 +37,8 @@ ARG USER ARG CONFIG_FILE ARG EXPOSE_PORT_1 ARG EXPOSE_PORT_2 +ARG EXPOSE_PORT_3 +ARG EXPOSE_PORT_4 RUN adduser -D 5000 $USER USER $USER @@ -54,5 +56,7 @@ COPY --from=builder /go/src/github.com/ipfs/go-ipfs/ipfs ipfs EXPOSE $EXPOSE_PORT_1 EXPOSE $EXPOSE_PORT_2 +EXPOSE $EXPOSE_PORT_3 +EXPOSE $EXPOSE_PORT_4 CMD ["./startup_script.sh"] diff --git a/documentation/super_node/setup.md b/documentation/super_node/setup.md index 40b330f7..685b66c2 100644 --- a/documentation/super_node/setup.md +++ b/documentation/super_node/setup.md @@ -105,36 +105,69 @@ Usage: `./vulcanizedb superNode --config=` +The config file contains the parameters needed to initialize a super node with the appropriate chain(s), settings, and services +The below example spins up a super node for btc and eth ```toml [superNode] - chain = "ethereum" - ipfsPath = "/root/.ipfs" + chains = ["ethereum", "bitcoin"] + ipfsPath = "/Users/iannorden/.ipfs" - [superNode.database] - name = "vulcanize_public" + [superNode.ethereum.database] + name = "vulcanize_demo" hostname = "localhost" port = 5432 - user = "ec2-user" + user = "postgres" - [superNode.sync] + [superNode.ethereum.sync] on = true wsPath = "ws://127.0.0.1:8546" workers = 1 - [superNode.server] + [superNode.ethereum.server] on = true - ipcPath = "/root/.vulcanize/eth/vulcanize.ipc" + ipcPath = "/Users/iannorden/.vulcanize/eth/vulcanize.ipc" wsPath = "127.0.0.1:8080" httpPath = "127.0.0.1:8081" - [superNode.backFill] + [superNode.ethereum.backFill] on = true httpPath = "http://127.0.0.1:8545" frequency = 15 batchSize = 50 + + [superNode.bitcoin.database] + name = "vulcanize_demo" + hostname = "localhost" + port = 5432 + user = "postgres" + + [superNode.bitcoin.sync] + on = true + wsPath = "127.0.0.1:8332" + workers = 1 + pass = "GhhOhxL6GxteDhgzrTqj" + user = "ocdrpc" + + [superNode.bitcoin.server] + on = true + ipcPath = "/Users/iannorden/.vulcanize/btc/vulcanize.ipc" + wsPath = "127.0.0.1:8082" + httpPath = "127.0.0.1:8083" + + [superNode.bitcoin.backFill] + on = true + httpPath = "127.0.0.1:8332" + frequency = 15 + batchSize = 50 + pass = "GhhOhxL6GxteDhgzrTqj" + user = "ocdrpc" + + [superNode.bitcoin.node] + nodeID = "ocd0" + clientName = "Omnicore" + genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" + networkID = "0xD9B4BEF9" ``` ### Dockerfile Setup @@ -211,6 +244,6 @@ createdb vulcanize_public 8. Build and run the Docker image ``` cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node -docker build --build-arg CONFIG_FILE=environments/ethSuperNode.toml --build-arg EXPOSE_PORT_1=8080 --build-arg EXPOSE_PORT_2=8081 . +docker build --build-arg CONFIG_FILE=environments/superNode.toml --build-arg EXPOSE_PORT_1=8080 --build-arg EXPOSE_PORT_2=8081 EXPOSE_PORT_3=8082 --build-arg EXPOSE_PORT_4=8083 . docker run --network host -e IPFS_INIT=true -e VDB_PG_NAME=vulcanize_public -e VDB_PG_HOSTNAME=localhost -e VDB_PG_PORT=5432 -e VDB_PG_USER=postgres -e VDB_PG_PASSWORD=password {IMAGE_ID} ``` \ No newline at end of file diff --git a/environments/btcSuperNode.toml b/environments/btcSuperNode.toml deleted file mode 100644 index de7f5e02..00000000 --- a/environments/btcSuperNode.toml +++ /dev/null @@ -1,36 +0,0 @@ -[superNode] - chain = "bitcoin" - ipfsPath = "/root/.ipfs" - - [superNode.database] - name = "vulcanize_public" - hostname = "localhost" - port = 5432 - user = "ec2-user" - - [superNode.sync] - on = true - wsPath = "127.0.0.1:8332" - workers = 1 - pass = "password" - user = "username" - - [superNode.server] - on = true - ipcPath = "/root/.vulcanize/btc/vulcanize.ipc" - wsPath = "127.0.0.1:8082" - httpPath = "127.0.0.1:8083" - - [superNode.backFill] - on = true - httpPath = "127.0.0.1:8332" - frequency = 15 - batchSize = 50 - pass = "password" - user = "username" - - [superNode.btc] - nodeID = "ocd0" - clientName = "Omnicore" - genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" - networkID = "0xD9B4BEF9" \ No newline at end of file diff --git a/environments/ethSuperNode.toml b/environments/ethSuperNode.toml deleted file mode 100644 index 1c3be4f7..00000000 --- a/environments/ethSuperNode.toml +++ /dev/null @@ -1,26 +0,0 @@ -[superNode] - chain = "ethereum" - ipfsPath = "/root/.ipfs" - - [superNode.database] - name = "vulcanize_public" - hostname = "localhost" - port = 5432 - user = "ec2-user" - - [superNode.sync] - on = true - wsPath = "ws://127.0.0.1:8546" - workers = 1 - - [superNode.server] - on = true - ipcPath = "/root/.vulcanize/eth/vulcanize.ipc" - wsPath = "127.0.0.1:8080" - httpPath = "127.0.0.1:8081" - - [superNode.backFill] - on = true - httpPath = "http://127.0.0.1:8545" - frequency = 15 - batchSize = 50 \ No newline at end of file diff --git a/environments/superNode.toml b/environments/superNode.toml new file mode 100644 index 00000000..9c77db2a --- /dev/null +++ b/environments/superNode.toml @@ -0,0 +1,59 @@ +[superNode] + chains = ["ethereum", "bitcoin"] + ipfsPath = "/root/.ipfs" + + [superNode.ethereum.database] + name = "vulcanize_public" + hostname = "localhost" + port = 5432 + user = "ec2-user" + + [superNode.ethereum.sync] + on = true + wsPath = "ws://127.0.0.1:8546" + workers = 1 + + [superNode.ethereum.server] + on = true + ipcPath = "/root/.vulcanize/eth/vulcanize.ipc" + wsPath = "127.0.0.1:8080" + httpPath = "127.0.0.1:8081" + + [superNode.ethereum.backFill] + on = true + httpPath = "http://127.0.0.1:8545" + frequency = 15 + batchSize = 50 + + [superNode.bitcoin.database] + name = "vulcanize_public" + hostname = "localhost" + port = 5432 + user = "ec2-user" + + [superNode.bitcoin.sync] + on = true + wsPath = "127.0.0.1:8332" + workers = 1 + pass = "password" + user = "username" + + [superNode.bitcoin.server] + on = true + ipcPath = "/root/.vulcanize/btc/vulcanize.ipc" + wsPath = "127.0.0.1:8082" + httpPath = "127.0.0.1:8083" + + [superNode.bitcoin.backFill] + on = true + httpPath = "127.0.0.1:8332" + frequency = 15 + batchSize = 50 + pass = "password" + user = "username" + + [superNode.bitcoin.node] + nodeID = "ocd0" + clientName = "Omnicore" + genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" + networkID = "0xD9B4BEF9" \ No newline at end of file diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index cde13139..4116e09b 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -36,7 +36,7 @@ const ( // BackFillInterface for filling in gaps in the super node type BackFillInterface interface { // Method for the super node to periodically check for and fill in gaps in its data using an archival node - FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) + FillGaps(wg *sync.WaitGroup) } // BackFillService for filling in gaps in the super node @@ -51,14 +51,18 @@ type BackFillService struct { Retriever shared.CIDRetriever // Interface for fetching payloads over at historical blocks; over http Fetcher shared.PayloadFetcher + // Channel for forwarding backfill payloads to the ScreenAndServe process + ScreenAndServeChan chan shared.StreamedIPLDs // Check frequency GapCheckFrequency time.Duration // Size of batch fetches BatchSize uint64 + // Channel for receiving quit signal + QuitChan chan bool } // NewBackFillService returns a new BackFillInterface -func NewBackFillService(settings *shared.SuperNodeConfig) (BackFillInterface, error) { +func NewBackFillService(settings *shared.SuperNodeConfig, screenAndServeChan chan shared.StreamedIPLDs) (BackFillInterface, error) { publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath) if err != nil { return nil, err @@ -84,26 +88,28 @@ func NewBackFillService(settings *shared.SuperNodeConfig) (BackFillInterface, er batchSize = DefaultMaxBatchSize } return &BackFillService{ - Indexer: indexer, - Converter: converter, - Publisher: publisher, - Retriever: retriever, - Fetcher: fetcher, - GapCheckFrequency: settings.Frequency, - BatchSize: batchSize, + Indexer: indexer, + Converter: converter, + Publisher: publisher, + Retriever: retriever, + Fetcher: fetcher, + GapCheckFrequency: settings.Frequency, + BatchSize: batchSize, + ScreenAndServeChan: screenAndServeChan, + QuitChan: settings.Quit, }, nil } // FillGaps periodically checks for and fills in gaps in the super node db // this requires a core.RpcClient that is pointed at an archival node with the StateDiffAt method exposed -func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) { +func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup) { ticker := time.NewTicker(bfs.GapCheckFrequency) wg.Add(1) go func() { for { select { - case <-quitChan: + case <-bfs.QuitChan: log.Info("quiting FillGaps process") wg.Done() return @@ -191,9 +197,11 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan errChan <- err continue } - // make backfiller a part of super_node service and forward these - // ipldPayload the the regular publishAndIndex and screenAndServe channels - // this would allow us to stream backfilled data to subscribers + // If there is a ScreenAndServe process listening, forward payload to it + select { + case bfs.ScreenAndServeChan <- ipldPayload: + default: + } cidPayload, err := bfs.Publisher.Publish(ipldPayload) if err != nil { errChan <- err diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 6713919a..833adce8 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -58,6 +58,7 @@ var _ = Describe("BackFiller", func() { 101: mocks.MockStateDiffPayload, }, } + quitChan := make(chan bool, 1) backfiller := &super_node.BackFillService{ Indexer: mockCidRepo, Publisher: mockPublisher, @@ -66,10 +67,10 @@ var _ = Describe("BackFiller", func() { Retriever: mockRetriever, GapCheckFrequency: time.Second * 2, BatchSize: super_node.DefaultMaxBatchSize, + QuitChan: quitChan, } wg := &sync.WaitGroup{} - quitChan := make(chan bool, 1) - backfiller.FillGaps(wg, quitChan) + backfiller.FillGaps(wg) time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) @@ -111,6 +112,7 @@ var _ = Describe("BackFiller", func() { 100: mocks.MockStateDiffPayload, }, } + quitChan := make(chan bool, 1) backfiller := &super_node.BackFillService{ Indexer: mockCidRepo, Publisher: mockPublisher, @@ -119,10 +121,10 @@ var _ = Describe("BackFiller", func() { Retriever: mockRetriever, GapCheckFrequency: time.Second * 2, BatchSize: super_node.DefaultMaxBatchSize, + QuitChan: quitChan, } wg := &sync.WaitGroup{} - quitChan := make(chan bool, 1) - backfiller.FillGaps(wg, quitChan) + backfiller.FillGaps(wg) time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) @@ -158,6 +160,7 @@ var _ = Describe("BackFiller", func() { 2: mocks.MockStateDiffPayload, }, } + quitChan := make(chan bool, 1) backfiller := &super_node.BackFillService{ Indexer: mockCidRepo, Publisher: mockPublisher, @@ -166,10 +169,10 @@ var _ = Describe("BackFiller", func() { Retriever: mockRetriever, GapCheckFrequency: time.Second * 2, BatchSize: super_node.DefaultMaxBatchSize, + QuitChan: quitChan, } wg := &sync.WaitGroup{} - quitChan := make(chan bool, 1) - backfiller.FillGaps(wg, quitChan) + backfiller.FillGaps(wg) time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) diff --git a/pkg/super_node/eth/mocks/converter.go b/pkg/super_node/eth/mocks/converter.go index 428d0d0b..09326998 100644 --- a/pkg/super_node/eth/mocks/converter.go +++ b/pkg/super_node/eth/mocks/converter.go @@ -58,7 +58,7 @@ func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (share return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) } pc.PassedStatediffPayload = append(pc.PassedStatediffPayload, stateDiffPayload) - if len(pc.PassedStatediffPayload) < pc.iteration+1 { + if len(pc.ReturnIPLDPayload) < pc.iteration+1 { return nil, fmt.Errorf("IterativePayloadConverter does not have a payload to return at iteration %d", pc.iteration) } returnPayload := pc.ReturnIPLDPayload[pc.iteration] diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index e16a51be..943cc275 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -29,7 +29,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/eth/core" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) @@ -45,9 +44,9 @@ type SuperNode interface { // APIs(), Protocols(), Start() and Stop() node.Service // Main event loop for syncAndPublish processes - SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs, forwardQuitchan chan<- bool) error + SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs) error // Main event loop for handling client pub-sub - ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs, screenAndServeQuit <-chan bool) + ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) // Method to subscribe to receive state diff processing output Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) // Method to unsubscribe from state diff processing @@ -98,9 +97,6 @@ type Service struct { // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct func NewSuperNode(settings *shared.SuperNodeConfig) (SuperNode, error) { - if err := ipfs.InitIPFSPlugins(); err != nil { - return nil, err - } sn := new(Service) var err error // If we are syncing, initialize the needed interfaces @@ -175,10 +171,10 @@ func (sap *Service) APIs() []rpc.API { return append(apis, chainAPI) } -// SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids +// SyncAndPublish is the backend processing loop which streams data, converts it to iplds, publishes them to ipfs, and indexes their cids // This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop // which filters and sends relevant data to client subscriptions, if there are any -func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs, screenAndServeQuit chan<- bool) error { +func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs) error { sub, err := sap.Streamer.Stream(sap.PayloadChan) if err != nil { return err @@ -187,11 +183,10 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha // Channels for forwarding data to the publishAndIndex workers publishAndIndexPayload := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) - publishAndIndexQuit := make(chan bool, sap.WorkerPoolSize) // publishAndIndex worker pool to handle publishing and indexing concurrently, while // limiting the number of Postgres connections we can possibly open so as to prevent error for i := 0; i < sap.WorkerPoolSize; i++ { - sap.publishAndIndex(i, publishAndIndexPayload, publishAndIndexQuit) + sap.publishAndIndex(i, publishAndIndexPayload) } go func() { for { @@ -208,25 +203,10 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha default: } // Forward the payload to the publishAndIndex workers - select { - case publishAndIndexPayload <- ipldPayload: - default: - } + publishAndIndexPayload <- ipldPayload case err := <-sub.Err(): log.Error(err) case <-sap.QuitChan: - // If we have a ScreenAndServe process running, forward the quit signal to it - select { - case screenAndServeQuit <- true: - default: - } - // Also forward a quit signal for each of the publishAndIndex workers - for i := 0; i < sap.WorkerPoolSize; i++ { - select { - case publishAndIndexQuit <- true: - default: - } - } log.Info("quiting SyncAndPublish process") wg.Done() return @@ -237,7 +217,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha return nil } -func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.StreamedIPLDs, publishAndIndexQuit <-chan bool) { +func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.StreamedIPLDs) { go func() { for { select { @@ -250,9 +230,6 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared if err := sap.Indexer.Index(cidPayload); err != nil { log.Errorf("worker %d error: %v", id, err) } - case <-publishAndIndexQuit: - log.Infof("quiting publishAndIndex worker %d", id) - return } } }() @@ -261,14 +238,14 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared // ScreenAndServe is the loop used to screen data streamed from the state diffing eth node // and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration -func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs, screenAndServeQuit <-chan bool) { +func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) { wg.Add(1) go func() { for { select { case payload := <-screenAndServePayload: sap.sendResponse(payload) - case <-screenAndServeQuit: + case <-sap.QuitChan: log.Info("quiting ScreenAndServe process") wg.Done() return @@ -422,11 +399,10 @@ func (sap *Service) Start(*p2p.Server) error { log.Info("Starting super node service") wg := new(sync.WaitGroup) payloadChan := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) - quitChan := make(chan bool, 1) - if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil { + if err := sap.SyncAndPublish(wg, payloadChan); err != nil { return err } - sap.ScreenAndServe(wg, payloadChan, quitChan) + sap.ScreenAndServe(wg, payloadChan) return nil } diff --git a/pkg/super_node/service_test.go b/pkg/super_node/service_test.go index 20deaa32..c4568ea6 100644 --- a/pkg/super_node/service_test.go +++ b/pkg/super_node/service_test.go @@ -63,7 +63,7 @@ var _ = Describe("Service", func() { QuitChan: quitChan, WorkerPoolSize: 1, } - err := processor.SyncAndPublish(wg, nil, nil) + err := processor.SyncAndPublish(wg, nil) Expect(err).ToNot(HaveOccurred()) time.Sleep(2 * time.Second) quitChan <- true diff --git a/pkg/super_node/shared/chain_type.go b/pkg/super_node/shared/chain_type.go index 3aad798c..90faf946 100644 --- a/pkg/super_node/shared/chain_type.go +++ b/pkg/super_node/shared/chain_type.go @@ -44,6 +44,19 @@ func (c ChainType) String() string { } } +func (c ChainType) API() string { + switch c { + case Ethereum: + return "eth" + case Bitcoin: + return "btc" + case Omni: + return "omni" + default: + return "" + } +} + func NewChainType(name string) (ChainType, error) { switch strings.ToLower(name) { case "ethereum", "eth": diff --git a/pkg/super_node/shared/config.go b/pkg/super_node/shared/config.go index ea330bc8..9b50a4a7 100644 --- a/pkg/super_node/shared/config.go +++ b/pkg/super_node/shared/config.go @@ -17,6 +17,7 @@ package shared import ( + "fmt" "os" "path/filepath" "time" @@ -61,21 +62,12 @@ type SuperNodeConfig struct { BatchSize uint64 } -// NewSuperNodeConfig is used to initialize a SuperNode config from a config .toml file -func NewSuperNodeConfig() (*SuperNodeConfig, error) { - sn := new(SuperNodeConfig) - sn.DBConfig = config.Database{ - Name: viper.GetString("superNode.database.name"), - Hostname: viper.GetString("superNode.database.hostname"), - Port: viper.GetInt("superNode.database.port"), - User: viper.GetString("superNode.database.user"), - Password: viper.GetString("superNode.database.password"), - } +// NewSuperNodeConfigs is used to initialize multiple SuperNode configs from a single config .toml file +// Separate chain supernode instances need to be ran in the same process in order to avoid lock contention on the ipfs repository +func NewSuperNodeConfigs() ([]*SuperNodeConfig, error) { + chains := viper.GetStringSlice("superNode.chains") + configs := make([]*SuperNodeConfig, len(chains)) var err error - sn.Chain, err = NewChainType(viper.GetString("superNode.chain")) - if err != nil { - return nil, err - } ipfsPath := viper.GetString("superNode.ipfsPath") if ipfsPath == "" { home, err := os.UserHomeDir() @@ -84,89 +76,104 @@ func NewSuperNodeConfig() (*SuperNodeConfig, error) { } ipfsPath = filepath.Join(home, ".ipfs") } - sn.IPFSPath = ipfsPath - sn.Serve = viper.GetBool("superNode.server.on") - sn.Sync = viper.GetBool("superNode.sync.on") - if sn.Sync { - workers := viper.GetInt("superNode.sync.workers") - if workers < 1 { - workers = 1 - } - sn.Workers = workers - switch sn.Chain { - case Ethereum: - sn.NodeInfo, sn.WSClient, err = getEthNodeAndClient(viper.GetString("superNode.sync.wsPath")) - case Bitcoin: - sn.NodeInfo = core.Node{ - ID: viper.GetString("superNode.btc.nodeID"), - ClientName: viper.GetString("superNode.btc.clientName"), - GenesisBlock: viper.GetString("superNode.btc.genesisBlock"), - NetworkID: viper.GetString("superNode.btc.networkID"), - } - // For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node - sn.WSClient = &rpcclient.ConnConfig{ - Host: viper.GetString("superNode.sync.wsPath"), - HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode - DisableTLS: true, // Bitcoin core does not provide TLS by default - Pass: viper.GetString("superNode.sync.pass"), - User: viper.GetString("superNode.sync.user"), - } - } - } - if sn.Serve { - wsPath := viper.GetString("superNode.server.wsPath") - if wsPath == "" { - wsPath = "ws://127.0.0.1:8546" - } - sn.WSEndpoint = wsPath - ipcPath := viper.GetString("superNode.server.ipcPath") - if ipcPath == "" { - home, err := os.UserHomeDir() - if err != nil { - return nil, err - } - ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") - } - sn.IPCEndpoint = ipcPath - httpPath := viper.GetString("superNode.server.httpPath") - if httpPath == "" { - httpPath = "http://127.0.0.1:8545" - } - sn.HTTPEndpoint = httpPath - } - db := utils.LoadPostgres(sn.DBConfig, sn.NodeInfo) - sn.DB = &db - sn.Quit = make(chan bool) - if viper.GetBool("superNode.backFill.on") { - if err := sn.BackFillFields(); err != nil { + for i, chain := range chains { + sn := new(SuperNodeConfig) + sn.Chain, err = NewChainType(chain) + if err != nil { return nil, err } + sn.DBConfig = config.Database{ + Name: viper.GetString(fmt.Sprintf("superNode.%s.database.name", chain)), + Hostname: viper.GetString(fmt.Sprintf("superNode.%s.database.hostname", chain)), + Port: viper.GetInt(fmt.Sprintf("superNode.%s.database.port", chain)), + User: viper.GetString(fmt.Sprintf("superNode.%s.database.user", chain)), + Password: viper.GetString(fmt.Sprintf("superNode.%s.database.password", chain)), + } + sn.IPFSPath = ipfsPath + sn.Serve = viper.GetBool(fmt.Sprintf("superNode.%s.server.on", chain)) + sn.Sync = viper.GetBool(fmt.Sprintf("superNode.%s.sync.on", chain)) + if sn.Sync { + workers := viper.GetInt("superNode.sync.workers") + if workers < 1 { + workers = 1 + } + sn.Workers = workers + switch sn.Chain { + case Ethereum: + sn.NodeInfo, sn.WSClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.sync.wsPath")) + case Bitcoin: + sn.NodeInfo = core.Node{ + ID: viper.GetString("superNode.bitcoin.node.nodeID"), + ClientName: viper.GetString("superNode.bitcoin.node.clientName"), + GenesisBlock: viper.GetString("superNode.bitcoin.node.genesisBlock"), + NetworkID: viper.GetString("superNode.bitcoin.node.networkID"), + } + // For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node + sn.WSClient = &rpcclient.ConnConfig{ + Host: viper.GetString("superNode.bitcoin.sync.wsPath"), + HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode + DisableTLS: true, // Bitcoin core does not provide TLS by default + Pass: viper.GetString("superNode.bitcoin.sync.pass"), + User: viper.GetString("superNode.bitcoin.sync.user"), + } + } + } + if sn.Serve { + wsPath := viper.GetString(fmt.Sprintf("superNode.%s.server.wsPath", chain)) + if wsPath == "" { + wsPath = "ws://127.0.0.1:8546" + } + sn.WSEndpoint = wsPath + ipcPath := viper.GetString(fmt.Sprintf("superNode.%s.server.ipcPath", chain)) + if ipcPath == "" { + home, err := os.UserHomeDir() + if err != nil { + return nil, err + } + ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") + } + sn.IPCEndpoint = ipcPath + httpPath := viper.GetString(fmt.Sprintf("superNode.%s.server.httpPath", chain)) + if httpPath == "" { + httpPath = "http://127.0.0.1:8545" + } + sn.HTTPEndpoint = httpPath + } + db := utils.LoadPostgres(sn.DBConfig, sn.NodeInfo) + sn.DB = &db + sn.Quit = make(chan bool) + if viper.GetBool(fmt.Sprintf("superNode.%s.backFill.on", chain)) { + if err := sn.BackFillFields(chain); err != nil { + return nil, err + } + } + configs[i] = sn } - return sn, err + return configs, err } // BackFillFields is used to fill in the BackFill fields of the config -func (sn *SuperNodeConfig) BackFillFields() error { +func (sn *SuperNodeConfig) BackFillFields(chain string) error { sn.BackFill = true var httpClient interface{} var err error switch sn.Chain { case Ethereum: - _, httpClient, err = getEthNodeAndClient(viper.GetString("superNode.backFill.httpPath")) + _, httpClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.backFill.httpPath")) if err != nil { return err } case Bitcoin: httpClient = &rpcclient.ConnConfig{ - Host: viper.GetString("superNode.backFill.httpPath"), + Host: viper.GetString("superNode.bitcoin.backFill.httpPath"), HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode DisableTLS: true, // Bitcoin core does not provide TLS by default - Pass: viper.GetString("superNode.backFill.pass"), - User: viper.GetString("superNode.backFill.user"), + Pass: viper.GetString("superNode.bitcoin.backFill.pass"), + User: viper.GetString("superNode.bitcoin.backFill.user"), } } sn.HTTPClient = httpClient - freq := viper.GetInt("superNode.backFill.frequency") + freq := viper.GetInt(fmt.Sprintf("superNode.%s.backFill.frequency", chain)) var frequency time.Duration if freq <= 0 { frequency = time.Second * 30 @@ -174,7 +181,7 @@ func (sn *SuperNodeConfig) BackFillFields() error { frequency = time.Second * time.Duration(freq) } sn.Frequency = frequency - sn.BatchSize = uint64(viper.GetInt64("superNode.backFill.batchSize")) + sn.BatchSize = uint64(viper.GetInt64(fmt.Sprintf("superNode.%s.backFill.batchSize", chain))) return nil }