separate super node instances for different chains need to be within the same process to avoid contention over ipfs repo lock

This commit is contained in:
Ian Norden 2020-02-13 16:50:56 -06:00
parent ef3b043f97
commit 48fb5bcd27
13 changed files with 278 additions and 240 deletions

View File

@ -18,6 +18,8 @@ package cmd
import ( import (
"sync" "sync"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -53,48 +55,43 @@ func init() {
} }
func superNode() { func superNode() {
superNode, superNodeConfig, err := newSuperNode() superNodeConfigs, err := shared.NewSuperNodeConfigs()
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
if err := ipfs.InitIPFSPlugins(); err != nil {
logWithCommand.Fatal(err)
}
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
var forwardQuitChan chan bool for _, superNodeConfig := range superNodeConfigs {
superNode, err := super_node.NewSuperNode(superNodeConfig)
if err != nil {
logWithCommand.Fatal(err)
}
var forwardPayloadChan chan shared.StreamedIPLDs var forwardPayloadChan chan shared.StreamedIPLDs
if superNodeConfig.Serve { if superNodeConfig.Serve {
forwardQuitChan = make(chan bool)
forwardPayloadChan = make(chan shared.StreamedIPLDs, super_node.PayloadChanBufferSize) forwardPayloadChan = make(chan shared.StreamedIPLDs, super_node.PayloadChanBufferSize)
superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) superNode.ScreenAndServe(wg, forwardPayloadChan)
if err := startServers(superNode, superNodeConfig); err != nil { if err := startServers(superNode, superNodeConfig); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
} }
if superNodeConfig.Sync { if superNodeConfig.Sync {
if err := superNode.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan); err != nil { if err := superNode.SyncAndPublish(wg, forwardPayloadChan); err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
} }
if superNodeConfig.BackFill { if superNodeConfig.BackFill {
backFiller, err := super_node.NewBackFillService(superNodeConfig) backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
backFiller.FillGaps(wg, nil) backFiller.FillGaps(wg)
}
} }
wg.Wait() wg.Wait()
} }
func newSuperNode() (super_node.SuperNode, *shared.SuperNodeConfig, error) {
superNodeConfig, err := shared.NewSuperNodeConfig()
if err != nil {
return nil, nil, err
}
sn, err := super_node.NewSuperNode(superNodeConfig)
if err != nil {
return nil, nil, err
}
return sn, superNodeConfig, nil
}
func startServers(superNode super_node.SuperNode, settings *shared.SuperNodeConfig) error { func startServers(superNode super_node.SuperNode, settings *shared.SuperNodeConfig) error {
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs()) _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs())
if err != nil { if err != nil {
@ -104,6 +101,6 @@ func startServers(superNode super_node.SuperNode, settings *shared.SuperNodeConf
if err != nil { if err != nil {
return err return err
} }
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{"eth", "btc"}, nil, nil, rpc.HTTPTimeouts{}) _, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{settings.Chain.API()}, nil, nil, rpc.HTTPTimeouts{})
return err return err
} }

View File

@ -37,6 +37,8 @@ ARG USER
ARG CONFIG_FILE ARG CONFIG_FILE
ARG EXPOSE_PORT_1 ARG EXPOSE_PORT_1
ARG EXPOSE_PORT_2 ARG EXPOSE_PORT_2
ARG EXPOSE_PORT_3
ARG EXPOSE_PORT_4
RUN adduser -D 5000 $USER RUN adduser -D 5000 $USER
USER $USER USER $USER
@ -54,5 +56,7 @@ COPY --from=builder /go/src/github.com/ipfs/go-ipfs/ipfs ipfs
EXPOSE $EXPOSE_PORT_1 EXPOSE $EXPOSE_PORT_1
EXPOSE $EXPOSE_PORT_2 EXPOSE $EXPOSE_PORT_2
EXPOSE $EXPOSE_PORT_3
EXPOSE $EXPOSE_PORT_4
CMD ["./startup_script.sh"] CMD ["./startup_script.sh"]

View File

@ -105,36 +105,69 @@ Usage:
`./vulcanizedb superNode --config=<config_file.toml` `./vulcanizedb superNode --config=<config_file.toml`
The config file contains the parameters needed to initialize a SuperNode with the appropriate chain, settings, and services The config file contains the parameters needed to initialize a super node with the appropriate chain(s), settings, and services
`./vulcanizedb syncAndPublish --config=<config_file.toml>`
The below example spins up a super node for btc and eth
```toml ```toml
[superNode] [superNode]
chain = "ethereum" chains = ["ethereum", "bitcoin"]
ipfsPath = "/root/.ipfs" ipfsPath = "/Users/iannorden/.ipfs"
[superNode.database] [superNode.ethereum.database]
name = "vulcanize_public" name = "vulcanize_demo"
hostname = "localhost" hostname = "localhost"
port = 5432 port = 5432
user = "ec2-user" user = "postgres"
[superNode.sync] [superNode.ethereum.sync]
on = true on = true
wsPath = "ws://127.0.0.1:8546" wsPath = "ws://127.0.0.1:8546"
workers = 1 workers = 1
[superNode.server] [superNode.ethereum.server]
on = true on = true
ipcPath = "/root/.vulcanize/eth/vulcanize.ipc" ipcPath = "/Users/iannorden/.vulcanize/eth/vulcanize.ipc"
wsPath = "127.0.0.1:8080" wsPath = "127.0.0.1:8080"
httpPath = "127.0.0.1:8081" httpPath = "127.0.0.1:8081"
[superNode.backFill] [superNode.ethereum.backFill]
on = true on = true
httpPath = "http://127.0.0.1:8545" httpPath = "http://127.0.0.1:8545"
frequency = 15 frequency = 15
batchSize = 50 batchSize = 50
[superNode.bitcoin.database]
name = "vulcanize_demo"
hostname = "localhost"
port = 5432
user = "postgres"
[superNode.bitcoin.sync]
on = true
wsPath = "127.0.0.1:8332"
workers = 1
pass = "GhhOhxL6GxteDhgzrTqj"
user = "ocdrpc"
[superNode.bitcoin.server]
on = true
ipcPath = "/Users/iannorden/.vulcanize/btc/vulcanize.ipc"
wsPath = "127.0.0.1:8082"
httpPath = "127.0.0.1:8083"
[superNode.bitcoin.backFill]
on = true
httpPath = "127.0.0.1:8332"
frequency = 15
batchSize = 50
pass = "GhhOhxL6GxteDhgzrTqj"
user = "ocdrpc"
[superNode.bitcoin.node]
nodeID = "ocd0"
clientName = "Omnicore"
genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
networkID = "0xD9B4BEF9"
``` ```
### Dockerfile Setup ### Dockerfile Setup
@ -211,6 +244,6 @@ createdb vulcanize_public
8. Build and run the Docker image 8. Build and run the Docker image
``` ```
cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node
docker build --build-arg CONFIG_FILE=environments/ethSuperNode.toml --build-arg EXPOSE_PORT_1=8080 --build-arg EXPOSE_PORT_2=8081 . docker build --build-arg CONFIG_FILE=environments/superNode.toml --build-arg EXPOSE_PORT_1=8080 --build-arg EXPOSE_PORT_2=8081 EXPOSE_PORT_3=8082 --build-arg EXPOSE_PORT_4=8083 .
docker run --network host -e IPFS_INIT=true -e VDB_PG_NAME=vulcanize_public -e VDB_PG_HOSTNAME=localhost -e VDB_PG_PORT=5432 -e VDB_PG_USER=postgres -e VDB_PG_PASSWORD=password {IMAGE_ID} docker run --network host -e IPFS_INIT=true -e VDB_PG_NAME=vulcanize_public -e VDB_PG_HOSTNAME=localhost -e VDB_PG_PORT=5432 -e VDB_PG_USER=postgres -e VDB_PG_PASSWORD=password {IMAGE_ID}
``` ```

View File

@ -1,36 +0,0 @@
[superNode]
chain = "bitcoin"
ipfsPath = "/root/.ipfs"
[superNode.database]
name = "vulcanize_public"
hostname = "localhost"
port = 5432
user = "ec2-user"
[superNode.sync]
on = true
wsPath = "127.0.0.1:8332"
workers = 1
pass = "password"
user = "username"
[superNode.server]
on = true
ipcPath = "/root/.vulcanize/btc/vulcanize.ipc"
wsPath = "127.0.0.1:8082"
httpPath = "127.0.0.1:8083"
[superNode.backFill]
on = true
httpPath = "127.0.0.1:8332"
frequency = 15
batchSize = 50
pass = "password"
user = "username"
[superNode.btc]
nodeID = "ocd0"
clientName = "Omnicore"
genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
networkID = "0xD9B4BEF9"

View File

@ -1,26 +0,0 @@
[superNode]
chain = "ethereum"
ipfsPath = "/root/.ipfs"
[superNode.database]
name = "vulcanize_public"
hostname = "localhost"
port = 5432
user = "ec2-user"
[superNode.sync]
on = true
wsPath = "ws://127.0.0.1:8546"
workers = 1
[superNode.server]
on = true
ipcPath = "/root/.vulcanize/eth/vulcanize.ipc"
wsPath = "127.0.0.1:8080"
httpPath = "127.0.0.1:8081"
[superNode.backFill]
on = true
httpPath = "http://127.0.0.1:8545"
frequency = 15
batchSize = 50

View File

@ -0,0 +1,59 @@
[superNode]
chains = ["ethereum", "bitcoin"]
ipfsPath = "/root/.ipfs"
[superNode.ethereum.database]
name = "vulcanize_public"
hostname = "localhost"
port = 5432
user = "ec2-user"
[superNode.ethereum.sync]
on = true
wsPath = "ws://127.0.0.1:8546"
workers = 1
[superNode.ethereum.server]
on = true
ipcPath = "/root/.vulcanize/eth/vulcanize.ipc"
wsPath = "127.0.0.1:8080"
httpPath = "127.0.0.1:8081"
[superNode.ethereum.backFill]
on = true
httpPath = "http://127.0.0.1:8545"
frequency = 15
batchSize = 50
[superNode.bitcoin.database]
name = "vulcanize_public"
hostname = "localhost"
port = 5432
user = "ec2-user"
[superNode.bitcoin.sync]
on = true
wsPath = "127.0.0.1:8332"
workers = 1
pass = "password"
user = "username"
[superNode.bitcoin.server]
on = true
ipcPath = "/root/.vulcanize/btc/vulcanize.ipc"
wsPath = "127.0.0.1:8082"
httpPath = "127.0.0.1:8083"
[superNode.bitcoin.backFill]
on = true
httpPath = "127.0.0.1:8332"
frequency = 15
batchSize = 50
pass = "password"
user = "username"
[superNode.bitcoin.node]
nodeID = "ocd0"
clientName = "Omnicore"
genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
networkID = "0xD9B4BEF9"

View File

@ -36,7 +36,7 @@ const (
// BackFillInterface for filling in gaps in the super node // BackFillInterface for filling in gaps in the super node
type BackFillInterface interface { type BackFillInterface interface {
// Method for the super node to periodically check for and fill in gaps in its data using an archival node // Method for the super node to periodically check for and fill in gaps in its data using an archival node
FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) FillGaps(wg *sync.WaitGroup)
} }
// BackFillService for filling in gaps in the super node // BackFillService for filling in gaps in the super node
@ -51,14 +51,18 @@ type BackFillService struct {
Retriever shared.CIDRetriever Retriever shared.CIDRetriever
// Interface for fetching payloads over at historical blocks; over http // Interface for fetching payloads over at historical blocks; over http
Fetcher shared.PayloadFetcher Fetcher shared.PayloadFetcher
// Channel for forwarding backfill payloads to the ScreenAndServe process
ScreenAndServeChan chan shared.StreamedIPLDs
// Check frequency // Check frequency
GapCheckFrequency time.Duration GapCheckFrequency time.Duration
// Size of batch fetches // Size of batch fetches
BatchSize uint64 BatchSize uint64
// Channel for receiving quit signal
QuitChan chan bool
} }
// NewBackFillService returns a new BackFillInterface // NewBackFillService returns a new BackFillInterface
func NewBackFillService(settings *shared.SuperNodeConfig) (BackFillInterface, error) { func NewBackFillService(settings *shared.SuperNodeConfig, screenAndServeChan chan shared.StreamedIPLDs) (BackFillInterface, error) {
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath) publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath)
if err != nil { if err != nil {
return nil, err return nil, err
@ -91,19 +95,21 @@ func NewBackFillService(settings *shared.SuperNodeConfig) (BackFillInterface, er
Fetcher: fetcher, Fetcher: fetcher,
GapCheckFrequency: settings.Frequency, GapCheckFrequency: settings.Frequency,
BatchSize: batchSize, BatchSize: batchSize,
ScreenAndServeChan: screenAndServeChan,
QuitChan: settings.Quit,
}, nil }, nil
} }
// FillGaps periodically checks for and fills in gaps in the super node db // FillGaps periodically checks for and fills in gaps in the super node db
// this requires a core.RpcClient that is pointed at an archival node with the StateDiffAt method exposed // this requires a core.RpcClient that is pointed at an archival node with the StateDiffAt method exposed
func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) { func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup) {
ticker := time.NewTicker(bfs.GapCheckFrequency) ticker := time.NewTicker(bfs.GapCheckFrequency)
wg.Add(1) wg.Add(1)
go func() { go func() {
for { for {
select { select {
case <-quitChan: case <-bfs.QuitChan:
log.Info("quiting FillGaps process") log.Info("quiting FillGaps process")
wg.Done() wg.Done()
return return
@ -191,9 +197,11 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan
errChan <- err errChan <- err
continue continue
} }
// make backfiller a part of super_node service and forward these // If there is a ScreenAndServe process listening, forward payload to it
// ipldPayload the the regular publishAndIndex and screenAndServe channels select {
// this would allow us to stream backfilled data to subscribers case bfs.ScreenAndServeChan <- ipldPayload:
default:
}
cidPayload, err := bfs.Publisher.Publish(ipldPayload) cidPayload, err := bfs.Publisher.Publish(ipldPayload)
if err != nil { if err != nil {
errChan <- err errChan <- err

View File

@ -58,6 +58,7 @@ var _ = Describe("BackFiller", func() {
101: mocks.MockStateDiffPayload, 101: mocks.MockStateDiffPayload,
}, },
} }
quitChan := make(chan bool, 1)
backfiller := &super_node.BackFillService{ backfiller := &super_node.BackFillService{
Indexer: mockCidRepo, Indexer: mockCidRepo,
Publisher: mockPublisher, Publisher: mockPublisher,
@ -66,10 +67,10 @@ var _ = Describe("BackFiller", func() {
Retriever: mockRetriever, Retriever: mockRetriever,
GapCheckFrequency: time.Second * 2, GapCheckFrequency: time.Second * 2,
BatchSize: super_node.DefaultMaxBatchSize, BatchSize: super_node.DefaultMaxBatchSize,
QuitChan: quitChan,
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
quitChan := make(chan bool, 1) backfiller.FillGaps(wg)
backfiller.FillGaps(wg, quitChan)
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
quitChan <- true quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
@ -111,6 +112,7 @@ var _ = Describe("BackFiller", func() {
100: mocks.MockStateDiffPayload, 100: mocks.MockStateDiffPayload,
}, },
} }
quitChan := make(chan bool, 1)
backfiller := &super_node.BackFillService{ backfiller := &super_node.BackFillService{
Indexer: mockCidRepo, Indexer: mockCidRepo,
Publisher: mockPublisher, Publisher: mockPublisher,
@ -119,10 +121,10 @@ var _ = Describe("BackFiller", func() {
Retriever: mockRetriever, Retriever: mockRetriever,
GapCheckFrequency: time.Second * 2, GapCheckFrequency: time.Second * 2,
BatchSize: super_node.DefaultMaxBatchSize, BatchSize: super_node.DefaultMaxBatchSize,
QuitChan: quitChan,
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
quitChan := make(chan bool, 1) backfiller.FillGaps(wg)
backfiller.FillGaps(wg, quitChan)
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
quitChan <- true quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
@ -158,6 +160,7 @@ var _ = Describe("BackFiller", func() {
2: mocks.MockStateDiffPayload, 2: mocks.MockStateDiffPayload,
}, },
} }
quitChan := make(chan bool, 1)
backfiller := &super_node.BackFillService{ backfiller := &super_node.BackFillService{
Indexer: mockCidRepo, Indexer: mockCidRepo,
Publisher: mockPublisher, Publisher: mockPublisher,
@ -166,10 +169,10 @@ var _ = Describe("BackFiller", func() {
Retriever: mockRetriever, Retriever: mockRetriever,
GapCheckFrequency: time.Second * 2, GapCheckFrequency: time.Second * 2,
BatchSize: super_node.DefaultMaxBatchSize, BatchSize: super_node.DefaultMaxBatchSize,
QuitChan: quitChan,
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
quitChan := make(chan bool, 1) backfiller.FillGaps(wg)
backfiller.FillGaps(wg, quitChan)
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
quitChan <- true quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))

View File

@ -58,7 +58,7 @@ func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (share
return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload)
} }
pc.PassedStatediffPayload = append(pc.PassedStatediffPayload, stateDiffPayload) pc.PassedStatediffPayload = append(pc.PassedStatediffPayload, stateDiffPayload)
if len(pc.PassedStatediffPayload) < pc.iteration+1 { if len(pc.ReturnIPLDPayload) < pc.iteration+1 {
return nil, fmt.Errorf("IterativePayloadConverter does not have a payload to return at iteration %d", pc.iteration) return nil, fmt.Errorf("IterativePayloadConverter does not have a payload to return at iteration %d", pc.iteration)
} }
returnPayload := pc.ReturnIPLDPayload[pc.iteration] returnPayload := pc.ReturnIPLDPayload[pc.iteration]

View File

@ -29,7 +29,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
@ -45,9 +44,9 @@ type SuperNode interface {
// APIs(), Protocols(), Start() and Stop() // APIs(), Protocols(), Start() and Stop()
node.Service node.Service
// Main event loop for syncAndPublish processes // Main event loop for syncAndPublish processes
SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs, forwardQuitchan chan<- bool) error SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs) error
// Main event loop for handling client pub-sub // Main event loop for handling client pub-sub
ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs, screenAndServeQuit <-chan bool) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs)
// Method to subscribe to receive state diff processing output // Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings)
// Method to unsubscribe from state diff processing // Method to unsubscribe from state diff processing
@ -98,9 +97,6 @@ type Service struct {
// NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct
func NewSuperNode(settings *shared.SuperNodeConfig) (SuperNode, error) { func NewSuperNode(settings *shared.SuperNodeConfig) (SuperNode, error) {
if err := ipfs.InitIPFSPlugins(); err != nil {
return nil, err
}
sn := new(Service) sn := new(Service)
var err error var err error
// If we are syncing, initialize the needed interfaces // If we are syncing, initialize the needed interfaces
@ -175,10 +171,10 @@ func (sap *Service) APIs() []rpc.API {
return append(apis, chainAPI) return append(apis, chainAPI)
} }
// SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids // SyncAndPublish is the backend processing loop which streams data, converts it to iplds, publishes them to ipfs, and indexes their cids
// This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop // This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop
// which filters and sends relevant data to client subscriptions, if there are any // which filters and sends relevant data to client subscriptions, if there are any
func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs, screenAndServeQuit chan<- bool) error { func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs) error {
sub, err := sap.Streamer.Stream(sap.PayloadChan) sub, err := sap.Streamer.Stream(sap.PayloadChan)
if err != nil { if err != nil {
return err return err
@ -187,11 +183,10 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha
// Channels for forwarding data to the publishAndIndex workers // Channels for forwarding data to the publishAndIndex workers
publishAndIndexPayload := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) publishAndIndexPayload := make(chan shared.StreamedIPLDs, PayloadChanBufferSize)
publishAndIndexQuit := make(chan bool, sap.WorkerPoolSize)
// publishAndIndex worker pool to handle publishing and indexing concurrently, while // publishAndIndex worker pool to handle publishing and indexing concurrently, while
// limiting the number of Postgres connections we can possibly open so as to prevent error // limiting the number of Postgres connections we can possibly open so as to prevent error
for i := 0; i < sap.WorkerPoolSize; i++ { for i := 0; i < sap.WorkerPoolSize; i++ {
sap.publishAndIndex(i, publishAndIndexPayload, publishAndIndexQuit) sap.publishAndIndex(i, publishAndIndexPayload)
} }
go func() { go func() {
for { for {
@ -208,25 +203,10 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha
default: default:
} }
// Forward the payload to the publishAndIndex workers // Forward the payload to the publishAndIndex workers
select { publishAndIndexPayload <- ipldPayload
case publishAndIndexPayload <- ipldPayload:
default:
}
case err := <-sub.Err(): case err := <-sub.Err():
log.Error(err) log.Error(err)
case <-sap.QuitChan: case <-sap.QuitChan:
// If we have a ScreenAndServe process running, forward the quit signal to it
select {
case screenAndServeQuit <- true:
default:
}
// Also forward a quit signal for each of the publishAndIndex workers
for i := 0; i < sap.WorkerPoolSize; i++ {
select {
case publishAndIndexQuit <- true:
default:
}
}
log.Info("quiting SyncAndPublish process") log.Info("quiting SyncAndPublish process")
wg.Done() wg.Done()
return return
@ -237,7 +217,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha
return nil return nil
} }
func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.StreamedIPLDs, publishAndIndexQuit <-chan bool) { func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.StreamedIPLDs) {
go func() { go func() {
for { for {
select { select {
@ -250,9 +230,6 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared
if err := sap.Indexer.Index(cidPayload); err != nil { if err := sap.Indexer.Index(cidPayload); err != nil {
log.Errorf("worker %d error: %v", id, err) log.Errorf("worker %d error: %v", id, err)
} }
case <-publishAndIndexQuit:
log.Infof("quiting publishAndIndex worker %d", id)
return
} }
} }
}() }()
@ -261,14 +238,14 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared
// ScreenAndServe is the loop used to screen data streamed from the state diffing eth node // ScreenAndServe is the loop used to screen data streamed from the state diffing eth node
// and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration // and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration
func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs, screenAndServeQuit <-chan bool) { func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) {
wg.Add(1) wg.Add(1)
go func() { go func() {
for { for {
select { select {
case payload := <-screenAndServePayload: case payload := <-screenAndServePayload:
sap.sendResponse(payload) sap.sendResponse(payload)
case <-screenAndServeQuit: case <-sap.QuitChan:
log.Info("quiting ScreenAndServe process") log.Info("quiting ScreenAndServe process")
wg.Done() wg.Done()
return return
@ -422,11 +399,10 @@ func (sap *Service) Start(*p2p.Server) error {
log.Info("Starting super node service") log.Info("Starting super node service")
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
payloadChan := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) payloadChan := make(chan shared.StreamedIPLDs, PayloadChanBufferSize)
quitChan := make(chan bool, 1) if err := sap.SyncAndPublish(wg, payloadChan); err != nil {
if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil {
return err return err
} }
sap.ScreenAndServe(wg, payloadChan, quitChan) sap.ScreenAndServe(wg, payloadChan)
return nil return nil
} }

View File

@ -63,7 +63,7 @@ var _ = Describe("Service", func() {
QuitChan: quitChan, QuitChan: quitChan,
WorkerPoolSize: 1, WorkerPoolSize: 1,
} }
err := processor.SyncAndPublish(wg, nil, nil) err := processor.SyncAndPublish(wg, nil)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
quitChan <- true quitChan <- true

View File

@ -44,6 +44,19 @@ func (c ChainType) String() string {
} }
} }
func (c ChainType) API() string {
switch c {
case Ethereum:
return "eth"
case Bitcoin:
return "btc"
case Omni:
return "omni"
default:
return ""
}
}
func NewChainType(name string) (ChainType, error) { func NewChainType(name string) (ChainType, error) {
switch strings.ToLower(name) { switch strings.ToLower(name) {
case "ethereum", "eth": case "ethereum", "eth":

View File

@ -17,6 +17,7 @@
package shared package shared
import ( import (
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
@ -61,21 +62,12 @@ type SuperNodeConfig struct {
BatchSize uint64 BatchSize uint64
} }
// NewSuperNodeConfig is used to initialize a SuperNode config from a config .toml file // NewSuperNodeConfigs is used to initialize multiple SuperNode configs from a single config .toml file
func NewSuperNodeConfig() (*SuperNodeConfig, error) { // Separate chain supernode instances need to be ran in the same process in order to avoid lock contention on the ipfs repository
sn := new(SuperNodeConfig) func NewSuperNodeConfigs() ([]*SuperNodeConfig, error) {
sn.DBConfig = config.Database{ chains := viper.GetStringSlice("superNode.chains")
Name: viper.GetString("superNode.database.name"), configs := make([]*SuperNodeConfig, len(chains))
Hostname: viper.GetString("superNode.database.hostname"),
Port: viper.GetInt("superNode.database.port"),
User: viper.GetString("superNode.database.user"),
Password: viper.GetString("superNode.database.password"),
}
var err error var err error
sn.Chain, err = NewChainType(viper.GetString("superNode.chain"))
if err != nil {
return nil, err
}
ipfsPath := viper.GetString("superNode.ipfsPath") ipfsPath := viper.GetString("superNode.ipfsPath")
if ipfsPath == "" { if ipfsPath == "" {
home, err := os.UserHomeDir() home, err := os.UserHomeDir()
@ -84,9 +76,22 @@ func NewSuperNodeConfig() (*SuperNodeConfig, error) {
} }
ipfsPath = filepath.Join(home, ".ipfs") ipfsPath = filepath.Join(home, ".ipfs")
} }
for i, chain := range chains {
sn := new(SuperNodeConfig)
sn.Chain, err = NewChainType(chain)
if err != nil {
return nil, err
}
sn.DBConfig = config.Database{
Name: viper.GetString(fmt.Sprintf("superNode.%s.database.name", chain)),
Hostname: viper.GetString(fmt.Sprintf("superNode.%s.database.hostname", chain)),
Port: viper.GetInt(fmt.Sprintf("superNode.%s.database.port", chain)),
User: viper.GetString(fmt.Sprintf("superNode.%s.database.user", chain)),
Password: viper.GetString(fmt.Sprintf("superNode.%s.database.password", chain)),
}
sn.IPFSPath = ipfsPath sn.IPFSPath = ipfsPath
sn.Serve = viper.GetBool("superNode.server.on") sn.Serve = viper.GetBool(fmt.Sprintf("superNode.%s.server.on", chain))
sn.Sync = viper.GetBool("superNode.sync.on") sn.Sync = viper.GetBool(fmt.Sprintf("superNode.%s.sync.on", chain))
if sn.Sync { if sn.Sync {
workers := viper.GetInt("superNode.sync.workers") workers := viper.GetInt("superNode.sync.workers")
if workers < 1 { if workers < 1 {
@ -95,31 +100,31 @@ func NewSuperNodeConfig() (*SuperNodeConfig, error) {
sn.Workers = workers sn.Workers = workers
switch sn.Chain { switch sn.Chain {
case Ethereum: case Ethereum:
sn.NodeInfo, sn.WSClient, err = getEthNodeAndClient(viper.GetString("superNode.sync.wsPath")) sn.NodeInfo, sn.WSClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.sync.wsPath"))
case Bitcoin: case Bitcoin:
sn.NodeInfo = core.Node{ sn.NodeInfo = core.Node{
ID: viper.GetString("superNode.btc.nodeID"), ID: viper.GetString("superNode.bitcoin.node.nodeID"),
ClientName: viper.GetString("superNode.btc.clientName"), ClientName: viper.GetString("superNode.bitcoin.node.clientName"),
GenesisBlock: viper.GetString("superNode.btc.genesisBlock"), GenesisBlock: viper.GetString("superNode.bitcoin.node.genesisBlock"),
NetworkID: viper.GetString("superNode.btc.networkID"), NetworkID: viper.GetString("superNode.bitcoin.node.networkID"),
} }
// For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node // For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node
sn.WSClient = &rpcclient.ConnConfig{ sn.WSClient = &rpcclient.ConnConfig{
Host: viper.GetString("superNode.sync.wsPath"), Host: viper.GetString("superNode.bitcoin.sync.wsPath"),
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default DisableTLS: true, // Bitcoin core does not provide TLS by default
Pass: viper.GetString("superNode.sync.pass"), Pass: viper.GetString("superNode.bitcoin.sync.pass"),
User: viper.GetString("superNode.sync.user"), User: viper.GetString("superNode.bitcoin.sync.user"),
} }
} }
} }
if sn.Serve { if sn.Serve {
wsPath := viper.GetString("superNode.server.wsPath") wsPath := viper.GetString(fmt.Sprintf("superNode.%s.server.wsPath", chain))
if wsPath == "" { if wsPath == "" {
wsPath = "ws://127.0.0.1:8546" wsPath = "ws://127.0.0.1:8546"
} }
sn.WSEndpoint = wsPath sn.WSEndpoint = wsPath
ipcPath := viper.GetString("superNode.server.ipcPath") ipcPath := viper.GetString(fmt.Sprintf("superNode.%s.server.ipcPath", chain))
if ipcPath == "" { if ipcPath == "" {
home, err := os.UserHomeDir() home, err := os.UserHomeDir()
if err != nil { if err != nil {
@ -128,7 +133,7 @@ func NewSuperNodeConfig() (*SuperNodeConfig, error) {
ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc")
} }
sn.IPCEndpoint = ipcPath sn.IPCEndpoint = ipcPath
httpPath := viper.GetString("superNode.server.httpPath") httpPath := viper.GetString(fmt.Sprintf("superNode.%s.server.httpPath", chain))
if httpPath == "" { if httpPath == "" {
httpPath = "http://127.0.0.1:8545" httpPath = "http://127.0.0.1:8545"
} }
@ -137,36 +142,38 @@ func NewSuperNodeConfig() (*SuperNodeConfig, error) {
db := utils.LoadPostgres(sn.DBConfig, sn.NodeInfo) db := utils.LoadPostgres(sn.DBConfig, sn.NodeInfo)
sn.DB = &db sn.DB = &db
sn.Quit = make(chan bool) sn.Quit = make(chan bool)
if viper.GetBool("superNode.backFill.on") { if viper.GetBool(fmt.Sprintf("superNode.%s.backFill.on", chain)) {
if err := sn.BackFillFields(); err != nil { if err := sn.BackFillFields(chain); err != nil {
return nil, err return nil, err
} }
} }
return sn, err configs[i] = sn
}
return configs, err
} }
// BackFillFields is used to fill in the BackFill fields of the config // BackFillFields is used to fill in the BackFill fields of the config
func (sn *SuperNodeConfig) BackFillFields() error { func (sn *SuperNodeConfig) BackFillFields(chain string) error {
sn.BackFill = true sn.BackFill = true
var httpClient interface{} var httpClient interface{}
var err error var err error
switch sn.Chain { switch sn.Chain {
case Ethereum: case Ethereum:
_, httpClient, err = getEthNodeAndClient(viper.GetString("superNode.backFill.httpPath")) _, httpClient, err = getEthNodeAndClient(viper.GetString("superNode.ethereum.backFill.httpPath"))
if err != nil { if err != nil {
return err return err
} }
case Bitcoin: case Bitcoin:
httpClient = &rpcclient.ConnConfig{ httpClient = &rpcclient.ConnConfig{
Host: viper.GetString("superNode.backFill.httpPath"), Host: viper.GetString("superNode.bitcoin.backFill.httpPath"),
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default DisableTLS: true, // Bitcoin core does not provide TLS by default
Pass: viper.GetString("superNode.backFill.pass"), Pass: viper.GetString("superNode.bitcoin.backFill.pass"),
User: viper.GetString("superNode.backFill.user"), User: viper.GetString("superNode.bitcoin.backFill.user"),
} }
} }
sn.HTTPClient = httpClient sn.HTTPClient = httpClient
freq := viper.GetInt("superNode.backFill.frequency") freq := viper.GetInt(fmt.Sprintf("superNode.%s.backFill.frequency", chain))
var frequency time.Duration var frequency time.Duration
if freq <= 0 { if freq <= 0 {
frequency = time.Second * 30 frequency = time.Second * 30
@ -174,7 +181,7 @@ func (sn *SuperNodeConfig) BackFillFields() error {
frequency = time.Second * time.Duration(freq) frequency = time.Second * time.Duration(freq)
} }
sn.Frequency = frequency sn.Frequency = frequency
sn.BatchSize = uint64(viper.GetInt64("superNode.backFill.batchSize")) sn.BatchSize = uint64(viper.GetInt64(fmt.Sprintf("superNode.%s.backFill.batchSize", chain)))
return nil return nil
} }