worker pool for handling concurrent ipld putting and cid indexing without overloading Postgres connections

This commit is contained in:
Ian Norden 2019-06-25 15:31:14 -05:00
parent 3fa33fb767
commit f2efbb5d01
5 changed files with 78 additions and 52 deletions

View File

@ -73,7 +73,7 @@ func syncPublishScreenAndServe() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
processor.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) processor.ScreenAndServe(forwardPayloadChan, forwardQuitChan)
var ipcPath string var ipcPath string
ipcPath = viper.GetString("server.ipcPath") ipcPath = viper.GetString("server.ipcPath")

View File

@ -7,7 +7,7 @@ RUN apk add busybox-extras
# Get and build vulcanizedb syncAndPublish fork # Get and build vulcanizedb syncAndPublish fork
RUN go get -u -d github.com/vulcanize/vulcanizedb RUN go get -u -d github.com/vulcanize/vulcanizedb
WORKDIR /go/src/github.com/vulcanize/vulcanizedb WORKDIR /go/src/github.com/vulcanize/vulcanizedb
RUN git checkout syncAndPublish RUN git checkout ipfs_concurrency
RUN dep ensure RUN dep ensure
RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o vulcanizedb . RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o vulcanizedb .

View File

@ -40,7 +40,6 @@ type IPLDPublisher interface {
// Publisher is the underlying struct for the IPLDPublisher interface // Publisher is the underlying struct for the IPLDPublisher interface
type Publisher struct { type Publisher struct {
Node *ipfs.IPFS
HeaderPutter *eth_block_header.BlockHeaderDagPutter HeaderPutter *eth_block_header.BlockHeaderDagPutter
TransactionPutter *eth_block_transactions.BlockTransactionsDagPutter TransactionPutter *eth_block_transactions.BlockTransactionsDagPutter
ReceiptPutter *eth_block_receipts.EthBlockReceiptDagPutter ReceiptPutter *eth_block_receipts.EthBlockReceiptDagPutter
@ -67,7 +66,6 @@ func NewIPLDPublisher(ipfsPath string) (*Publisher, error) {
return nil, err return nil, err
} }
return &Publisher{ return &Publisher{
Node: node,
HeaderPutter: eth_block_header.NewBlockHeaderDagPutter(node, rlp.RlpDecoder{}), HeaderPutter: eth_block_header.NewBlockHeaderDagPutter(node, rlp.RlpDecoder{}),
TransactionPutter: eth_block_transactions.NewBlockTransactionsDagPutter(node), TransactionPutter: eth_block_transactions.NewBlockTransactionsDagPutter(node),
ReceiptPutter: eth_block_receipts.NewEthBlockReceiptDagPutter(node), ReceiptPutter: eth_block_receipts.NewEthBlockReceiptDagPutter(node),

View File

@ -42,7 +42,10 @@ func NewCIDRepository(db *postgres.DB) *Repository {
// Index indexes a cidPayload in Postgres // Index indexes a cidPayload in Postgres
func (repo *Repository) Index(cidPayload *CIDPayload) error { func (repo *Repository) Index(cidPayload *CIDPayload) error {
tx, _ := repo.db.Beginx() tx, err := repo.db.Beginx()
if err != nil {
return err
}
headerID, err := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex()) headerID, err := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex())
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()

View File

@ -17,7 +17,6 @@
package ipfs package ipfs
import ( import (
"fmt"
"sync" "sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -35,6 +34,7 @@ import (
) )
const payloadChanBufferSize = 20000 // the max eth sub buffer size const payloadChanBufferSize = 20000 // the max eth sub buffer size
const workerPoolSize = 1
// SyncPublishScreenAndServe is the top level interface for streaming, converting to IPLDs, publishing, // SyncPublishScreenAndServe is the top level interface for streaming, converting to IPLDs, publishing,
// and indexing all Ethereum data; screening this data; and serving it up to subscribed clients // and indexing all Ethereum data; screening this data; and serving it up to subscribed clients
@ -45,7 +45,7 @@ type SyncPublishScreenAndServe interface {
// Main event loop for syncAndPublish processes // Main event loop for syncAndPublish processes
SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error
// Main event loop for handling client pub-sub // Main event loop for handling client pub-sub
ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) ScreenAndServe(screenAndServePayload <-chan IPLDPayload, screenAndServeQuit <-chan bool)
// Method to subscribe to receive state diff processing output // Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription)
// Method to unsubscribe from state diff processing // Method to unsubscribe from state diff processing
@ -128,12 +128,22 @@ func (sap *Service) APIs() []rpc.API {
// SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids // SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids
// This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop // This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop
// which filters and sends relevant data to client subscriptions, if there are any // which filters and sends relevant data to client subscriptions, if there are any
func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error { func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- IPLDPayload, screenAndServeQuit chan<- bool) error {
sub, err := sap.Streamer.Stream(sap.PayloadChan) sub, err := sap.Streamer.Stream(sap.PayloadChan)
if err != nil { if err != nil {
return err return err
} }
wg.Add(1) wg.Add(1)
// Channels for forwarding data to the publishAndIndex workers
publishAndIndexPayload := make(chan IPLDPayload, payloadChanBufferSize)
publishAndIndexQuit := make(chan bool, workerPoolSize)
// publishAndIndex worker pool to handle publishing and indexing concurrently, while
// limiting the number of Postgres connections we can possibly open so as to prevent error
for i := 0; i < workerPoolSize; i++ {
sap.publishAndIndex(i, publishAndIndexPayload, publishAndIndexQuit)
}
go func() { go func() {
for { for {
select { select {
@ -149,26 +159,29 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<-
} }
// If we have a ScreenAndServe process running, forward the payload to it // If we have a ScreenAndServe process running, forward the payload to it
select { select {
case forwardPayloadChan <- *ipldPayload: case screenAndServePayload <- *ipldPayload:
default: default:
} }
cidPayload, err := sap.Publisher.Publish(ipldPayload) // Forward the payload to the publishAndIndex workers
if err != nil { select {
log.Error(err) case publishAndIndexPayload <- *ipldPayload:
continue default:
}
err = sap.Repository.Index(cidPayload)
if err != nil {
log.Error(err)
} }
case err = <-sub.Err(): case err = <-sub.Err():
log.Error(err) log.Error(err)
case <-sap.QuitChan: case <-sap.QuitChan:
// If we have a ScreenAndServe process running, forward the quit signal to it // If we have a ScreenAndServe process running, forward the quit signal to it
select { select {
case forwardQuitchan <- true: case screenAndServeQuit <- true:
default: default:
} }
// Also forward a quit signal for each of the workers
for i := 0; i < workerPoolSize; i++ {
select {
case publishAndIndexQuit <- true:
default:
}
}
log.Info("quiting SyncAndPublish process") log.Info("quiting SyncAndPublish process")
wg.Done() wg.Done()
return return
@ -179,44 +192,71 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<-
return nil return nil
} }
// ScreenAndServe is the loop used to screen data streamed from the state diffing eth node func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan IPLDPayload, publishAndIndexQuit <-chan bool) {
// and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration
func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) {
wg.Add(1)
go func() { go func() {
for { for {
select { select {
case payload := <-receivePayloadChan: case payload := <-publishAndIndexPayload:
err := sap.processResponse(payload) cidPayload, err := sap.Publisher.Publish(&payload)
if err != nil { if err != nil {
log.Error(err) log.Errorf("worker %d error: %v", id, err)
continue
} }
case <-receiveQuitchan: err = sap.Repository.Index(cidPayload)
log.Info("quiting ScreenAndServe process") if err != nil {
wg.Done() log.Errorf("worker %d error: %v", id, err)
}
case <-publishAndIndexQuit:
log.Infof("quiting publishAndIndex worked %d", id)
return return
} }
} }
}() }()
} }
func (sap *Service) processResponse(payload IPLDPayload) error { // ScreenAndServe is the loop used to screen data streamed from the state diffing eth node
// and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration
func (sap *Service) ScreenAndServe(screenAndServePayload <-chan IPLDPayload, screenAndServeQuit <-chan bool) {
go func() {
for {
select {
case payload := <-screenAndServePayload:
err := sap.sendResponse(payload)
if err != nil {
log.Error(err)
}
case <-screenAndServeQuit:
log.Info("quiting ScreenAndServe process")
return
}
}
}()
}
func (sap *Service) sendResponse(payload IPLDPayload) error {
sap.Lock()
for ty, subs := range sap.Subscriptions { for ty, subs := range sap.Subscriptions {
// Retreive the subscription paramaters for this subscription type // Retrieve the subscription parameters for this subscription type
subConfig, ok := sap.SubscriptionTypes[ty] subConfig, ok := sap.SubscriptionTypes[ty]
if !ok { if !ok {
return fmt.Errorf("subscription configuration for subscription type %s not available", ty.Hex()) log.Errorf("subscription configuration for subscription type %s not available", ty.Hex())
continue
} }
response, err := sap.Screener.ScreenResponse(subConfig, payload) response, err := sap.Screener.ScreenResponse(subConfig, payload)
if err != nil { if err != nil {
return err log.Error(err)
continue
} }
for id := range subs { for id, sub := range subs {
//TODO send payloads to this type of sub select {
sap.serve(id, *response, ty) case sub.PayloadChan <- *response:
log.Infof("sending seed node payload to subscription %s", id)
default:
log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
}
} }
} }
sap.Unlock()
return nil return nil
} }
@ -308,7 +348,7 @@ func (sap *Service) Start(*p2p.Server) error {
if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil { if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil {
return err return err
} }
sap.ScreenAndServe(wg, payloadChan, quitChan) sap.ScreenAndServe(payloadChan, quitChan)
return nil return nil
} }
@ -319,21 +359,6 @@ func (sap *Service) Stop() error {
return nil return nil
} }
// serve is used to send screened payloads to their requesting sub
func (sap *Service) serve(id rpc.ID, payload ResponsePayload, ty common.Hash) {
sap.Lock()
sub, ok := sap.Subscriptions[ty][id]
if ok {
select {
case sub.PayloadChan <- payload:
log.Infof("sending seed node payload to subscription %s", id)
default:
log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
}
}
sap.Unlock()
}
// close is used to close all listening subscriptions // close is used to close all listening subscriptions
func (sap *Service) close() { func (sap *Service) close() {
sap.Lock() sap.Lock()