diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 75123327..1c23f698 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -73,7 +73,7 @@ func syncPublishScreenAndServe() { if err != nil { log.Fatal(err) } - processor.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) + processor.ScreenAndServe(forwardPayloadChan, forwardQuitChan) var ipcPath string ipcPath = viper.GetString("server.ipcPath") diff --git a/dockerfiles/seed_node/Dockerfile b/dockerfiles/seed_node/Dockerfile index fc809234..34e60ea5 100644 --- a/dockerfiles/seed_node/Dockerfile +++ b/dockerfiles/seed_node/Dockerfile @@ -7,7 +7,7 @@ RUN apk add busybox-extras # Get and build vulcanizedb syncAndPublish fork RUN go get -u -d github.com/vulcanize/vulcanizedb WORKDIR /go/src/github.com/vulcanize/vulcanizedb -RUN git checkout syncAndPublish +RUN git checkout ipfs_concurrency RUN dep ensure RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o vulcanizedb . diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index 4d0158ef..0bbeaf47 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -40,7 +40,6 @@ type IPLDPublisher interface { // Publisher is the underlying struct for the IPLDPublisher interface type Publisher struct { - Node *ipfs.IPFS HeaderPutter *eth_block_header.BlockHeaderDagPutter TransactionPutter *eth_block_transactions.BlockTransactionsDagPutter ReceiptPutter *eth_block_receipts.EthBlockReceiptDagPutter @@ -67,7 +66,6 @@ func NewIPLDPublisher(ipfsPath string) (*Publisher, error) { return nil, err } return &Publisher{ - Node: node, HeaderPutter: eth_block_header.NewBlockHeaderDagPutter(node, rlp.RlpDecoder{}), TransactionPutter: eth_block_transactions.NewBlockTransactionsDagPutter(node), ReceiptPutter: eth_block_receipts.NewEthBlockReceiptDagPutter(node), diff --git a/pkg/ipfs/repository.go b/pkg/ipfs/repository.go index 317e8d78..5fd9b021 100644 --- a/pkg/ipfs/repository.go +++ b/pkg/ipfs/repository.go @@ -42,7 +42,10 @@ func NewCIDRepository(db *postgres.DB) *Repository { // Index indexes a cidPayload in Postgres func (repo *Repository) Index(cidPayload *CIDPayload) error { - tx, _ := repo.db.Beginx() + tx, err := repo.db.Beginx() + if err != nil { + return err + } headerID, err := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex()) if err != nil { tx.Rollback() diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go index e16f4bb2..58657a1f 100644 --- a/pkg/ipfs/service.go +++ b/pkg/ipfs/service.go @@ -17,7 +17,6 @@ package ipfs import ( - "fmt" "sync" "github.com/ethereum/go-ethereum/common" @@ -35,6 +34,7 @@ import ( ) const payloadChanBufferSize = 20000 // the max eth sub buffer size +const workerPoolSize = 1 // SyncPublishScreenAndServe is the top level interface for streaming, converting to IPLDs, publishing, // and indexing all Ethereum data; screening this data; and serving it up to subscribed clients @@ -45,7 +45,7 @@ type SyncPublishScreenAndServe interface { // Main event loop for syncAndPublish processes SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error // Main event loop for handling client pub-sub - ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) + ScreenAndServe(screenAndServePayload <-chan IPLDPayload, screenAndServeQuit <-chan bool) // Method to subscribe to receive state diff processing output Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription) // Method to unsubscribe from state diff processing @@ -128,12 +128,22 @@ func (sap *Service) APIs() []rpc.API { // SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids // This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop // which filters and sends relevant data to client subscriptions, if there are any -func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error { +func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- IPLDPayload, screenAndServeQuit chan<- bool) error { sub, err := sap.Streamer.Stream(sap.PayloadChan) if err != nil { return err } wg.Add(1) + + // Channels for forwarding data to the publishAndIndex workers + publishAndIndexPayload := make(chan IPLDPayload, payloadChanBufferSize) + publishAndIndexQuit := make(chan bool, workerPoolSize) + // publishAndIndex worker pool to handle publishing and indexing concurrently, while + // limiting the number of Postgres connections we can possibly open so as to prevent error + for i := 0; i < workerPoolSize; i++ { + sap.publishAndIndex(i, publishAndIndexPayload, publishAndIndexQuit) + } + go func() { for { select { @@ -149,26 +159,29 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- } // If we have a ScreenAndServe process running, forward the payload to it select { - case forwardPayloadChan <- *ipldPayload: + case screenAndServePayload <- *ipldPayload: default: } - cidPayload, err := sap.Publisher.Publish(ipldPayload) - if err != nil { - log.Error(err) - continue - } - err = sap.Repository.Index(cidPayload) - if err != nil { - log.Error(err) + // Forward the payload to the publishAndIndex workers + select { + case publishAndIndexPayload <- *ipldPayload: + default: } case err = <-sub.Err(): log.Error(err) case <-sap.QuitChan: // If we have a ScreenAndServe process running, forward the quit signal to it select { - case forwardQuitchan <- true: + case screenAndServeQuit <- true: default: } + // Also forward a quit signal for each of the workers + for i := 0; i < workerPoolSize; i++ { + select { + case publishAndIndexQuit <- true: + default: + } + } log.Info("quiting SyncAndPublish process") wg.Done() return @@ -179,44 +192,71 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- return nil } -// ScreenAndServe is the loop used to screen data streamed from the state diffing eth node -// and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration -func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) { - wg.Add(1) +func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan IPLDPayload, publishAndIndexQuit <-chan bool) { go func() { for { select { - case payload := <-receivePayloadChan: - err := sap.processResponse(payload) + case payload := <-publishAndIndexPayload: + cidPayload, err := sap.Publisher.Publish(&payload) if err != nil { - log.Error(err) + log.Errorf("worker %d error: %v", id, err) + continue } - case <-receiveQuitchan: - log.Info("quiting ScreenAndServe process") - wg.Done() + err = sap.Repository.Index(cidPayload) + if err != nil { + log.Errorf("worker %d error: %v", id, err) + } + case <-publishAndIndexQuit: + log.Infof("quiting publishAndIndex worked %d", id) return } } }() } -func (sap *Service) processResponse(payload IPLDPayload) error { +// ScreenAndServe is the loop used to screen data streamed from the state diffing eth node +// and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration +func (sap *Service) ScreenAndServe(screenAndServePayload <-chan IPLDPayload, screenAndServeQuit <-chan bool) { + go func() { + for { + select { + case payload := <-screenAndServePayload: + err := sap.sendResponse(payload) + if err != nil { + log.Error(err) + } + case <-screenAndServeQuit: + log.Info("quiting ScreenAndServe process") + return + } + } + }() +} + +func (sap *Service) sendResponse(payload IPLDPayload) error { + sap.Lock() for ty, subs := range sap.Subscriptions { - // Retreive the subscription paramaters for this subscription type + // Retrieve the subscription parameters for this subscription type subConfig, ok := sap.SubscriptionTypes[ty] if !ok { - return fmt.Errorf("subscription configuration for subscription type %s not available", ty.Hex()) + log.Errorf("subscription configuration for subscription type %s not available", ty.Hex()) + continue } response, err := sap.Screener.ScreenResponse(subConfig, payload) if err != nil { - return err + log.Error(err) + continue } - for id := range subs { - //TODO send payloads to this type of sub - sap.serve(id, *response, ty) - + for id, sub := range subs { + select { + case sub.PayloadChan <- *response: + log.Infof("sending seed node payload to subscription %s", id) + default: + log.Infof("unable to send payload to subscription %s; channel has no receiver", id) + } } } + sap.Unlock() return nil } @@ -308,7 +348,7 @@ func (sap *Service) Start(*p2p.Server) error { if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil { return err } - sap.ScreenAndServe(wg, payloadChan, quitChan) + sap.ScreenAndServe(payloadChan, quitChan) return nil } @@ -319,21 +359,6 @@ func (sap *Service) Stop() error { return nil } -// serve is used to send screened payloads to their requesting sub -func (sap *Service) serve(id rpc.ID, payload ResponsePayload, ty common.Hash) { - sap.Lock() - sub, ok := sap.Subscriptions[ty][id] - if ok { - select { - case sub.PayloadChan <- payload: - log.Infof("sending seed node payload to subscription %s", id) - default: - log.Infof("unable to send payload to subscription %s; channel has no receiver", id) - } - } - sap.Unlock() -} - // close is used to close all listening subscriptions func (sap *Service) close() { sap.Lock()