forked from cerc-io/ipld-eth-server
make db fks deferrable so that we can commit entire cid payload in single transaction; adjust buffer sizes to optimize performane and stability
This commit is contained in:
parent
b1bb646ad5
commit
4baea2923c
@ -56,7 +56,7 @@ func streamSubscribe() {
|
|||||||
str := streamer.NewSeedStreamer(rpcClient)
|
str := streamer.NewSeedStreamer(rpcClient)
|
||||||
|
|
||||||
// Buffered channel for reading subscription payloads
|
// Buffered channel for reading subscription payloads
|
||||||
payloadChan := make(chan ipfs.ResponsePayload, 8000)
|
payloadChan := make(chan ipfs.ResponsePayload, 20000)
|
||||||
|
|
||||||
// Subscribe to the seed node service with the given config/filter parameters
|
// Subscribe to the seed node service with the given config/filter parameters
|
||||||
sub, err := str.Stream(payloadChan, subConfig)
|
sub, err := str.Stream(payloadChan, subConfig)
|
||||||
|
@ -50,15 +50,15 @@ func syncPublishScreenAndServe() {
|
|||||||
blockChain, ethClient, rpcClient := getBlockChainAndClients()
|
blockChain, ethClient, rpcClient := getBlockChainAndClients()
|
||||||
|
|
||||||
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
|
||||||
quitChan := make(chan bool)
|
quitChan := make(chan bool, 1)
|
||||||
processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan)
|
processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg := &syn.WaitGroup{}
|
wg := &syn.WaitGroup{}
|
||||||
forwardPayloadChan := make(chan ipfs.IPLDPayload)
|
forwardPayloadChan := make(chan ipfs.IPLDPayload, 20000)
|
||||||
forwardQuitChan := make(chan bool)
|
forwardQuitChan := make(chan bool, 1)
|
||||||
err = processor.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan)
|
err = processor.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
-- +goose Up
|
-- +goose Up
|
||||||
CREATE TABLE public.transaction_cids (
|
CREATE TABLE public.transaction_cids (
|
||||||
id SERIAL PRIMARY KEY,
|
id SERIAL PRIMARY KEY,
|
||||||
header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE,
|
header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
|
||||||
tx_hash VARCHAR(66) NOT NULL,
|
tx_hash VARCHAR(66) NOT NULL,
|
||||||
cid TEXT NOT NULL,
|
cid TEXT NOT NULL,
|
||||||
dst VARCHAR(66) NOT NULL,
|
dst VARCHAR(66) NOT NULL,
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
-- +goose Up
|
-- +goose Up
|
||||||
CREATE TABLE public.receipt_cids (
|
CREATE TABLE public.receipt_cids (
|
||||||
id SERIAL PRIMARY KEY,
|
id SERIAL PRIMARY KEY,
|
||||||
tx_id INTEGER NOT NULL REFERENCES transaction_cids (id) ON DELETE CASCADE,
|
tx_id INTEGER NOT NULL REFERENCES transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
|
||||||
cid TEXT NOT NULL,
|
cid TEXT NOT NULL,
|
||||||
topic0s VARCHAR(66)[]
|
topic0s VARCHAR(66)[]
|
||||||
);
|
);
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
-- +goose Up
|
-- +goose Up
|
||||||
CREATE TABLE public.state_cids (
|
CREATE TABLE public.state_cids (
|
||||||
id SERIAL PRIMARY KEY,
|
id SERIAL PRIMARY KEY,
|
||||||
header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE,
|
header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
|
||||||
state_key VARCHAR(66) NOT NULL,
|
state_key VARCHAR(66) NOT NULL,
|
||||||
leaf BOOLEAN NOT NULL,
|
leaf BOOLEAN NOT NULL,
|
||||||
cid TEXT NOT NULL,
|
cid TEXT NOT NULL,
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
-- +goose Up
|
-- +goose Up
|
||||||
CREATE TABLE public.storage_cids (
|
CREATE TABLE public.storage_cids (
|
||||||
id SERIAL PRIMARY KEY,
|
id SERIAL PRIMARY KEY,
|
||||||
state_id INTEGER NOT NULL REFERENCES state_cids (id) ON DELETE CASCADE,
|
state_id INTEGER NOT NULL REFERENCES state_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
|
||||||
storage_key VARCHAR(66) NOT NULL,
|
storage_key VARCHAR(66) NOT NULL,
|
||||||
leaf BOOLEAN NOT NULL,
|
leaf BOOLEAN NOT NULL,
|
||||||
cid TEXT NOT NULL,
|
cid TEXT NOT NULL,
|
||||||
|
@ -56,8 +56,8 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// subscribe to events from the SyncPublishScreenAndServe service
|
// subscribe to events from the SyncPublishScreenAndServe service
|
||||||
payloadChannel := make(chan ResponsePayload)
|
payloadChannel := make(chan ResponsePayload, payloadChanBufferSize)
|
||||||
quitChan := make(chan bool)
|
quitChan := make(chan bool, 1)
|
||||||
go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, &streamFilters)
|
go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, &streamFilters)
|
||||||
|
|
||||||
// loop and await state diff payloads and relay them to the subscriber with then notifier
|
// loop and await state diff payloads and relay them to the subscriber with then notifier
|
||||||
|
@ -147,13 +147,6 @@ func (pub *Publisher) publishHeaders(headerRLP []byte) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pub *Publisher) publishTransactions(blockBody *types.Body, trxMeta []*TrxMetaData) (map[common.Hash]*TrxMetaData, error) {
|
func (pub *Publisher) publishTransactions(blockBody *types.Body, trxMeta []*TrxMetaData) (map[common.Hash]*TrxMetaData, error) {
|
||||||
/*
|
|
||||||
println("publishing transactions")
|
|
||||||
for _, trx := range blockBody.Transactions {
|
|
||||||
println("trx value:")
|
|
||||||
println(trx.Value().Int64())
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
transactionCids, err := pub.TransactionPutter.DagPut(blockBody)
|
transactionCids, err := pub.TransactionPutter.DagPut(blockBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -206,7 +199,7 @@ func (pub *Publisher) publishStateNodes(stateNodes map[common.Hash]StateNode) (m
|
|||||||
func (pub *Publisher) publishStorageNodes(storageNodes map[common.Hash][]StorageNode) (map[common.Hash][]StorageNodeCID, error) {
|
func (pub *Publisher) publishStorageNodes(storageNodes map[common.Hash][]StorageNode) (map[common.Hash][]StorageNodeCID, error) {
|
||||||
storageLeafCids := make(map[common.Hash][]StorageNodeCID)
|
storageLeafCids := make(map[common.Hash][]StorageNodeCID)
|
||||||
for addr, storageTrie := range storageNodes {
|
for addr, storageTrie := range storageNodes {
|
||||||
storageLeafCids[addr] = make([]StorageNodeCID, 0)
|
storageLeafCids[addr] = make([]StorageNodeCID, 0, len(storageTrie))
|
||||||
for _, node := range storageTrie {
|
for _, node := range storageTrie {
|
||||||
storageNodeCid, err := pub.StoragePutter.DagPut(node.Value)
|
storageNodeCid, err := pub.StoragePutter.DagPut(node.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -17,15 +17,13 @@
|
|||||||
package ipfs
|
package ipfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/i-norden/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Still seeing some errors from tx and storage indexing processes... due to fk constraints being broken
|
|
||||||
|
|
||||||
// CIDRepository is an interface for indexing CIDPayloads
|
// CIDRepository is an interface for indexing CIDPayloads
|
||||||
type CIDRepository interface {
|
type CIDRepository interface {
|
||||||
Index(cidPayload *CIDPayload) error
|
Index(cidPayload *CIDPayload) error
|
||||||
@ -58,16 +56,17 @@ func (repo *Repository) Index(cidPayload *CIDPayload) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tx.Commit()
|
err = repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID)
|
||||||
err = repo.indexTransactionAndReceiptCIDs(cidPayload, headerID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = repo.indexStateAndStorageCIDs(cidPayload, headerID)
|
err = repo.indexStateAndStorageCIDs(tx, cidPayload, headerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) {
|
func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) {
|
||||||
@ -80,14 +79,13 @@ func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string) error {
|
func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string) error {
|
||||||
_, err := tx.Queryx(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4)
|
_, err := tx.Exec(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4)
|
||||||
ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, final) = ($3, $4)`,
|
ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, final) = ($3, $4)`,
|
||||||
blockNumber, hash, cid, false)
|
blockNumber, hash, cid, false)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repository) indexTransactionAndReceiptCIDs(payload *CIDPayload, headerID int64) error {
|
func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error {
|
||||||
tx, _ := repo.db.Beginx()
|
|
||||||
for hash, trxCidMeta := range payload.TransactionCIDs {
|
for hash, trxCidMeta := range payload.TransactionCIDs {
|
||||||
var txID int64
|
var txID int64
|
||||||
err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5)
|
err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5)
|
||||||
@ -95,19 +93,17 @@ func (repo *Repository) indexTransactionAndReceiptCIDs(payload *CIDPayload, head
|
|||||||
RETURNING id`,
|
RETURNING id`,
|
||||||
headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src).Scan(&txID)
|
headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src).Scan(&txID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
receiptCidMeta, ok := payload.ReceiptCIDs[hash]
|
receiptCidMeta, ok := payload.ReceiptCIDs[hash]
|
||||||
if ok {
|
if ok {
|
||||||
err = repo.indexReceiptCID(tx, receiptCidMeta, txID)
|
err = repo.indexReceiptCID(tx, receiptCidMeta, txID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, txID int64) error {
|
func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, txID int64) error {
|
||||||
@ -116,8 +112,7 @@ func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, t
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repository) indexStateAndStorageCIDs(payload *CIDPayload, headerID int64) error {
|
func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error {
|
||||||
tx, _ := repo.db.Beginx()
|
|
||||||
for accountKey, stateCID := range payload.StateNodeCIDs {
|
for accountKey, stateCID := range payload.StateNodeCIDs {
|
||||||
var stateID int64
|
var stateID int64
|
||||||
err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4)
|
err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4)
|
||||||
@ -125,22 +120,20 @@ func (repo *Repository) indexStateAndStorageCIDs(payload *CIDPayload, headerID i
|
|||||||
RETURNING id`,
|
RETURNING id`,
|
||||||
headerID, accountKey.Hex(), stateCID.CID, stateCID.Leaf).Scan(&stateID)
|
headerID, accountKey.Hex(), stateCID.CID, stateCID.Leaf).Scan(&stateID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, storageCID := range payload.StorageNodeCIDs[accountKey] {
|
for _, storageCID := range payload.StorageNodeCIDs[accountKey] {
|
||||||
err = repo.indexStorageCID(tx, storageCID, stateID)
|
err = repo.indexStorageCID(tx, storageCID, stateID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID, stateID int64) error {
|
func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID, stateID int64) error {
|
||||||
_, err := repo.db.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid, leaf) VALUES ($1, $2, $3, $4)
|
_, err := tx.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid, leaf) VALUES ($1, $2, $3, $4)
|
||||||
ON CONFLICT (state_id, storage_key) DO UPDATE SET (cid, leaf) = ($3, $4)`,
|
ON CONFLICT (state_id, storage_key) DO UPDATE SET (cid, leaf) = ($3, $4)`,
|
||||||
stateID, storageCID.Key, storageCID.CID, storageCID.Leaf)
|
stateID, storageCID.Key, storageCID.CID, storageCID.Leaf)
|
||||||
return err
|
return err
|
||||||
|
@ -32,10 +32,11 @@ import (
|
|||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
const payloadChanBufferSize = 8000 // the max eth sub buffer size
|
const payloadChanBufferSize = 20000 // the max eth sub buffer size
|
||||||
|
|
||||||
// SyncPublishScreenAndServe is an interface for streaming, converting to IPLDs, publishing,
|
// SyncPublishScreenAndServe is an interface for streaming, converting to IPLDs, publishing,
|
||||||
// indexing all Ethereum data screening this data, and serving it up to subscribed clients
|
// indexing all Ethereum data screening this data, and serving it up to subscribed clients
|
||||||
|
// This service is compatible with the Ethereum service interface (node.Service)
|
||||||
type SyncPublishScreenAndServe interface {
|
type SyncPublishScreenAndServe interface {
|
||||||
// APIs(), Protocols(), Start() and Stop()
|
// APIs(), Protocols(), Start() and Stop()
|
||||||
node.Service
|
node.Service
|
||||||
@ -264,9 +265,11 @@ func (sap *Service) Unsubscribe(id rpc.ID) error {
|
|||||||
func (sap *Service) Start(*p2p.Server) error {
|
func (sap *Service) Start(*p2p.Server) error {
|
||||||
log.Info("Starting statediff service")
|
log.Info("Starting statediff service")
|
||||||
wg := new(sync.WaitGroup)
|
wg := new(sync.WaitGroup)
|
||||||
payloadChan := make(chan IPLDPayload)
|
payloadChan := make(chan IPLDPayload, payloadChanBufferSize)
|
||||||
quitChan := make(chan bool)
|
quitChan := make(chan bool, 1)
|
||||||
sap.SyncAndPublish(wg, payloadChan, quitChan)
|
if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
sap.ScreenAndServe(wg, payloadChan, quitChan)
|
sap.ScreenAndServe(wg, payloadChan, quitChan)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -299,11 +302,11 @@ func (sap *Service) close() {
|
|||||||
for id, sub := range sap.Subscriptions {
|
for id, sub := range sap.Subscriptions {
|
||||||
select {
|
select {
|
||||||
case sub.QuitChan <- true:
|
case sub.QuitChan <- true:
|
||||||
delete(sap.Subscriptions, id)
|
|
||||||
log.Infof("closing subscription %s", id)
|
log.Infof("closing subscription %s", id)
|
||||||
default:
|
default:
|
||||||
log.Infof("unable to close subscription %s; channel has no receiver", id)
|
log.Infof("unable to close subscription %s; channel has no receiver", id)
|
||||||
}
|
}
|
||||||
|
delete(sap.Subscriptions, id)
|
||||||
}
|
}
|
||||||
sap.Unlock()
|
sap.Unlock()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user