diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index fe895d49..d2ac134b 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -56,7 +56,7 @@ func streamSubscribe() { str := streamer.NewSeedStreamer(rpcClient) // Buffered channel for reading subscription payloads - payloadChan := make(chan ipfs.ResponsePayload, 8000) + payloadChan := make(chan ipfs.ResponsePayload, 20000) // Subscribe to the seed node service with the given config/filter parameters sub, err := str.Stream(payloadChan, subConfig) diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 14c53bdf..b80f31f2 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -50,15 +50,15 @@ func syncPublishScreenAndServe() { blockChain, ethClient, rpcClient := getBlockChainAndClients() db := utils.LoadPostgres(databaseConfig, blockChain.Node()) - quitChan := make(chan bool) + quitChan := make(chan bool, 1) processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan) if err != nil { log.Fatal(err) } wg := &syn.WaitGroup{} - forwardPayloadChan := make(chan ipfs.IPLDPayload) - forwardQuitChan := make(chan bool) + forwardPayloadChan := make(chan ipfs.IPLDPayload, 20000) + forwardQuitChan := make(chan bool, 1) err = processor.SyncAndPublish(wg, forwardPayloadChan, forwardQuitChan) if err != nil { log.Fatal(err) diff --git a/db/migrations/00029_create_transaction_cids_table.sql b/db/migrations/00029_create_transaction_cids_table.sql index 79e44d12..ee2cca22 100644 --- a/db/migrations/00029_create_transaction_cids_table.sql +++ b/db/migrations/00029_create_transaction_cids_table.sql @@ -1,7 +1,7 @@ -- +goose Up CREATE TABLE public.transaction_cids ( id SERIAL PRIMARY KEY, - header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE, + header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, tx_hash VARCHAR(66) NOT NULL, cid TEXT NOT NULL, dst VARCHAR(66) NOT NULL, diff --git a/db/migrations/00030_create_receipt_cids_table.sql b/db/migrations/00030_create_receipt_cids_table.sql index a889d4f5..29372955 100644 --- a/db/migrations/00030_create_receipt_cids_table.sql +++ b/db/migrations/00030_create_receipt_cids_table.sql @@ -1,7 +1,7 @@ -- +goose Up CREATE TABLE public.receipt_cids ( id SERIAL PRIMARY KEY, - tx_id INTEGER NOT NULL REFERENCES transaction_cids (id) ON DELETE CASCADE, + tx_id INTEGER NOT NULL REFERENCES transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, cid TEXT NOT NULL, topic0s VARCHAR(66)[] ); diff --git a/db/migrations/00031_create_state_cids_table.sql b/db/migrations/00031_create_state_cids_table.sql index b040bdee..6efce63d 100644 --- a/db/migrations/00031_create_state_cids_table.sql +++ b/db/migrations/00031_create_state_cids_table.sql @@ -1,7 +1,7 @@ -- +goose Up CREATE TABLE public.state_cids ( id SERIAL PRIMARY KEY, - header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE, + header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, state_key VARCHAR(66) NOT NULL, leaf BOOLEAN NOT NULL, cid TEXT NOT NULL, diff --git a/db/migrations/00032_create_storage_cids_table.sql b/db/migrations/00032_create_storage_cids_table.sql index 607ccb7c..2506be41 100644 --- a/db/migrations/00032_create_storage_cids_table.sql +++ b/db/migrations/00032_create_storage_cids_table.sql @@ -1,7 +1,7 @@ -- +goose Up CREATE TABLE public.storage_cids ( id SERIAL PRIMARY KEY, - state_id INTEGER NOT NULL REFERENCES state_cids (id) ON DELETE CASCADE, + state_id INTEGER NOT NULL REFERENCES state_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, storage_key VARCHAR(66) NOT NULL, leaf BOOLEAN NOT NULL, cid TEXT NOT NULL, diff --git a/pkg/ipfs/api.go b/pkg/ipfs/api.go index 150ece3f..aecb0763 100644 --- a/pkg/ipfs/api.go +++ b/pkg/ipfs/api.go @@ -56,8 +56,8 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S go func() { // subscribe to events from the SyncPublishScreenAndServe service - payloadChannel := make(chan ResponsePayload) - quitChan := make(chan bool) + payloadChannel := make(chan ResponsePayload, payloadChanBufferSize) + quitChan := make(chan bool, 1) go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, &streamFilters) // loop and await state diff payloads and relay them to the subscriber with then notifier diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index fbee915a..8cef5a18 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -147,13 +147,6 @@ func (pub *Publisher) publishHeaders(headerRLP []byte) (string, error) { } func (pub *Publisher) publishTransactions(blockBody *types.Body, trxMeta []*TrxMetaData) (map[common.Hash]*TrxMetaData, error) { - /* - println("publishing transactions") - for _, trx := range blockBody.Transactions { - println("trx value:") - println(trx.Value().Int64()) - } - */ transactionCids, err := pub.TransactionPutter.DagPut(blockBody) if err != nil { return nil, err @@ -206,7 +199,7 @@ func (pub *Publisher) publishStateNodes(stateNodes map[common.Hash]StateNode) (m func (pub *Publisher) publishStorageNodes(storageNodes map[common.Hash][]StorageNode) (map[common.Hash][]StorageNodeCID, error) { storageLeafCids := make(map[common.Hash][]StorageNodeCID) for addr, storageTrie := range storageNodes { - storageLeafCids[addr] = make([]StorageNodeCID, 0) + storageLeafCids[addr] = make([]StorageNodeCID, 0, len(storageTrie)) for _, node := range storageTrie { storageNodeCid, err := pub.StoragePutter.DagPut(node.Value) if err != nil { diff --git a/pkg/ipfs/repository.go b/pkg/ipfs/repository.go index a433eb71..66da684d 100644 --- a/pkg/ipfs/repository.go +++ b/pkg/ipfs/repository.go @@ -17,15 +17,13 @@ package ipfs import ( - "github.com/i-norden/go-ethereum/core" + "github.com/ethereum/go-ethereum/core" "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) -// Still seeing some errors from tx and storage indexing processes... due to fk constraints being broken - // CIDRepository is an interface for indexing CIDPayloads type CIDRepository interface { Index(cidPayload *CIDPayload) error @@ -58,16 +56,17 @@ func (repo *Repository) Index(cidPayload *CIDPayload) error { return err } } - tx.Commit() - err = repo.indexTransactionAndReceiptCIDs(cidPayload, headerID) + err = repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID) if err != nil { + tx.Rollback() return err } - err = repo.indexStateAndStorageCIDs(cidPayload, headerID) + err = repo.indexStateAndStorageCIDs(tx, cidPayload, headerID) if err != nil { + tx.Rollback() return err } - return nil + return tx.Commit() } func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) { @@ -80,14 +79,13 @@ func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash strin } func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string) error { - _, err := tx.Queryx(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4) + _, err := tx.Exec(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4) ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, final) = ($3, $4)`, blockNumber, hash, cid, false) return err } -func (repo *Repository) indexTransactionAndReceiptCIDs(payload *CIDPayload, headerID int64) error { - tx, _ := repo.db.Beginx() +func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error { for hash, trxCidMeta := range payload.TransactionCIDs { var txID int64 err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5) @@ -95,19 +93,17 @@ func (repo *Repository) indexTransactionAndReceiptCIDs(payload *CIDPayload, head RETURNING id`, headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src).Scan(&txID) if err != nil { - tx.Rollback() return err } receiptCidMeta, ok := payload.ReceiptCIDs[hash] if ok { err = repo.indexReceiptCID(tx, receiptCidMeta, txID) if err != nil { - tx.Rollback() return err } } } - return tx.Commit() + return nil } func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, txID int64) error { @@ -116,8 +112,7 @@ func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, t return err } -func (repo *Repository) indexStateAndStorageCIDs(payload *CIDPayload, headerID int64) error { - tx, _ := repo.db.Beginx() +func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error { for accountKey, stateCID := range payload.StateNodeCIDs { var stateID int64 err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4) @@ -125,22 +120,20 @@ func (repo *Repository) indexStateAndStorageCIDs(payload *CIDPayload, headerID i RETURNING id`, headerID, accountKey.Hex(), stateCID.CID, stateCID.Leaf).Scan(&stateID) if err != nil { - tx.Rollback() return err } for _, storageCID := range payload.StorageNodeCIDs[accountKey] { err = repo.indexStorageCID(tx, storageCID, stateID) if err != nil { - tx.Rollback() return err } } } - return tx.Commit() + return nil } func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID, stateID int64) error { - _, err := repo.db.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid, leaf) VALUES ($1, $2, $3, $4) + _, err := tx.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid, leaf) VALUES ($1, $2, $3, $4) ON CONFLICT (state_id, storage_key) DO UPDATE SET (cid, leaf) = ($3, $4)`, stateID, storageCID.Key, storageCID.CID, storageCID.Leaf) return err diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go index 062a2175..a89907b8 100644 --- a/pkg/ipfs/service.go +++ b/pkg/ipfs/service.go @@ -32,10 +32,11 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) -const payloadChanBufferSize = 8000 // the max eth sub buffer size +const payloadChanBufferSize = 20000 // the max eth sub buffer size // SyncPublishScreenAndServe is an interface for streaming, converting to IPLDs, publishing, // indexing all Ethereum data screening this data, and serving it up to subscribed clients +// This service is compatible with the Ethereum service interface (node.Service) type SyncPublishScreenAndServe interface { // APIs(), Protocols(), Start() and Stop() node.Service @@ -264,9 +265,11 @@ func (sap *Service) Unsubscribe(id rpc.ID) error { func (sap *Service) Start(*p2p.Server) error { log.Info("Starting statediff service") wg := new(sync.WaitGroup) - payloadChan := make(chan IPLDPayload) - quitChan := make(chan bool) - sap.SyncAndPublish(wg, payloadChan, quitChan) + payloadChan := make(chan IPLDPayload, payloadChanBufferSize) + quitChan := make(chan bool, 1) + if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil { + return err + } sap.ScreenAndServe(wg, payloadChan, quitChan) return nil } @@ -299,11 +302,11 @@ func (sap *Service) close() { for id, sub := range sap.Subscriptions { select { case sub.QuitChan <- true: - delete(sap.Subscriptions, id) log.Infof("closing subscription %s", id) default: log.Infof("unable to close subscription %s; channel has no receiver", id) } + delete(sap.Subscriptions, id) } sap.Unlock() }