diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index 7501dad0..2ea0479a 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -20,7 +20,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" @@ -31,20 +30,19 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/geth/node" "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/utils" + "github.com/vulcanize/vulcanizedb/pkg/core" ) // syncAndPublishCmd represents the syncAndPublish command var syncAndPublishCmd = &cobra.Command{ Use: "syncAndPublish", - Short: "A brief description of your command", - Long: `A longer description that spans multiple lines and likely contains examples -and usage of using your command. For example: - -Cobra is a CLI library for Go that empowers applications. -This application is a tool to generate the needed files -to quickly create a Cobra application.`, + Short: "Syncs all Ethereum data into IPFS, indexing the CIDs", + Long: `This command works alongside a modified geth node which streams +all block and state (diff) data over a websocket subscription. This process +then converts the eth objects to IPLDs and publishes them IPFS. Additionally, +it maintains a local index of the IPLD objects' CIDs in Postgres.`, Run: func(cmd *cobra.Command, args []string) { - syncAndPulish() + syncAndPublish() }, } @@ -54,10 +52,10 @@ var ( func init() { rootCmd.AddCommand(syncAndPublishCmd) - syncAndPublishCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "", "Path for configuring IPFS node") + syncAndPublishCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "~/.ipfs", "Path for configuring IPFS node") } -func syncAndPulish() { +func syncAndPublish() { blockChain, ethClient, rpcClient := getBlockChainAndClients() db := utils.LoadPostgres(databaseConfig, blockChain.Node()) @@ -68,8 +66,11 @@ func syncAndPulish() { } wg := syn.WaitGroup{} - indexer.Index(wg) - wg.Wait() + err = indexer.Index(wg) + if err != nil { + log.Fatal(err) + } + wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through } func getBlockChainAndClients() (*geth.BlockChain, core.EthClient, core.RpcClient) { diff --git a/pkg/eth/blockchain.go b/pkg/eth/blockchain.go index c06b27f4..4bab19a0 100644 --- a/pkg/eth/blockchain.go +++ b/pkg/eth/blockchain.go @@ -18,10 +18,10 @@ package eth import ( "errors" - "github.com/ethereum/go-ethereum" "math/big" "strconv" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" diff --git a/pkg/eth/client/rpc_client.go b/pkg/eth/client/rpc_client.go index 3c8dafa6..fac16ccb 100644 --- a/pkg/eth/client/rpc_client.go +++ b/pkg/eth/client/rpc_client.go @@ -17,6 +17,7 @@ package client import ( + "errors" "context" "errors" "reflect" diff --git a/pkg/ipfs/indexer.go b/pkg/ipfs/indexer.go index 85859a55..baabd213 100644 --- a/pkg/ipfs/indexer.go +++ b/pkg/ipfs/indexer.go @@ -17,23 +17,27 @@ package ipfs import ( - log "github.com/sirupsen/logrus" "sync" + log "github.com/sirupsen/logrus" "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) -const payloadChanBufferSize = 800 // 1/10th max size +const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size +// Indexer is an interface for streaming, converting to IPLDs, publishing, and indexing all Ethereum data +// This is the top-level interface used by the syncAndPublish command type Indexer interface { - Index() error + Index(wg sync.WaitGroup) error } -type IPFSIndexer struct { - Syncer Syncer +// ipfsIndexer is the underlying struct for the Indexer interface +// we are not exporting this to enforce proper initialization through the NewIPFSIndexer function +type ipfsIndexer struct { + Streamer Streamer Converter Converter Publisher Publisher Repository Repository @@ -41,13 +45,14 @@ type IPFSIndexer struct { QuitChan chan bool } -func NewIPFSIndexer(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (*IPFSIndexer, error) { +// NewIPFSIndexer creates a new Indexer interface using an underlying ipfsIndexer struct +func NewIPFSIndexer(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (Indexer, error) { publisher, err := NewIPLDPublisher(ipfsPath) if err != nil { return nil, err } - return &IPFSIndexer{ - Syncer: NewStateDiffSyncer(rpcClient), + return &ipfsIndexer{ + Streamer: NewStateDiffSyncer(rpcClient), Repository: NewCIDRepository(db), Converter: NewPayloadConverter(ethClient), Publisher: publisher, @@ -56,9 +61,9 @@ func NewIPFSIndexer(ipfsPath string, db *postgres.DB, ethClient core.EthClient, }, nil } -// The main processing loop for the syncAndPublish -func (i *IPFSIndexer) Index(wg sync.WaitGroup) error { - sub, err := i.Syncer.Sync(i.PayloadChan) +// Index is the main processing loop +func (i *ipfsIndexer) Index(wg sync.WaitGroup) error { + sub, err := i.Streamer.Stream(i.PayloadChan) if err != nil { return err } @@ -76,7 +81,7 @@ func (i *IPFSIndexer) Index(wg sync.WaitGroup) error { if err != nil { log.Error(err) } - err = i.Repository.IndexCIDs(cidPayload) + err = i.Repository.Index(cidPayload) if err != nil { log.Error(err) } diff --git a/pkg/ipfs/payload_converter.go b/pkg/ipfs/payload_converter.go index 24327c42..8573cf90 100644 --- a/pkg/ipfs/payload_converter.go +++ b/pkg/ipfs/payload_converter.go @@ -28,14 +28,17 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) +// Converter interface is used to convert a geth statediff.Payload to our IPLDPayload type type Converter interface { Convert(payload statediff.Payload) (*IPLDPayload, error) } +// PayloadConverter is the underlying struct for the Converter interface type PayloadConverter struct { client core.EthClient } +// IPLDPayload is a custom type which packages ETH data for the IPFS publisher type IPLDPayload struct { HeaderRLP []byte BlockNumber *big.Int @@ -46,12 +49,14 @@ type IPLDPayload struct { StorageLeafs map[common.Hash]map[common.Hash][]byte } +// NewPayloadConverter creates a pointer to a new PayloadConverter which satisfies the Converter interface func NewPayloadConverter(client core.EthClient) *PayloadConverter { return &PayloadConverter{ client: client, } } +// Convert method is used to convert a geth statediff.Payload to a IPLDPayload func (pc *PayloadConverter) Convert(payload statediff.Payload) (*IPLDPayload, error) { // Unpack block rlp to access fields block := new(types.Block) diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index 023d11a8..9c137473 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -19,6 +19,7 @@ package ipfs import ( "errors" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/common" "github.com/vulcanize/eth-block-extractor/pkg/ipfs" @@ -30,10 +31,12 @@ import ( "github.com/vulcanize/eth-block-extractor/pkg/wrappers/rlp" ) +// Publisher is the interface for publishing an IPLD payload type Publisher interface { Publish(payload *IPLDPayload) (*CIDPayload, error) } +// IPLDPublisher is the underlying struct for the Publisher interface type IPLDPublisher struct { Node *ipfs.IPFS HeaderPutter *eth_block_header.BlockHeaderDagPutter @@ -43,6 +46,7 @@ type IPLDPublisher struct { StoragePutter *eth_storage_trie.StorageTrieDagPutter } +// CID payload is a struct to hold all the CIDs and their meta data type CIDPayload struct { BlockNumber string BlockHash string @@ -53,6 +57,7 @@ type CIDPayload struct { StorageLeafCIDs map[common.Hash]map[common.Hash]string } +// NewIPLDPublisher creates a pointer to a new IPLDPublisher which satisfies the Publisher interface func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { node, err := ipfs.InitIPFSNode(ipfsPath) if err != nil { @@ -69,45 +74,94 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { }, nil } +// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload func (pub *IPLDPublisher) Publish(payload *IPLDPayload) (*CIDPayload, error) { // Process and publish headers - headerCids, err := pub.HeaderPutter.DagPut(payload.HeaderRLP) + headerCid, err := pub.publishHeaders(payload.HeaderRLP) if err != nil { return nil, err } - if len(headerCids) != 1 { - return nil, errors.New("single CID expected to be returned for header") - } // Process and publish transactions - transactionCids, err := pub.TransactionPutter.DagPut(payload.BlockBody) + transactionCids, err := pub.publishTransactions(payload.BlockBody) if err != nil { return nil, err } - if len(transactionCids) != len(payload.BlockBody.Transactions) { - return nil, errors.New("expected one CID for each transaction") - } - trxCids := make(map[common.Hash]string, len(transactionCids)) - for i, trx := range payload.BlockBody.Transactions { - trxCids[trx.Hash()] = transactionCids[i] - } // Process and publish receipts - receiptsCids, err := pub.ReceiptPutter.DagPut(payload.Receipts) + receiptsCids, err := pub.publishReceipts(payload.Receipts) if err != nil { return nil, err } - if len(receiptsCids) != len(payload.Receipts) { - return nil, errors.New("expected one CID for each receipt") - } - rctCids := make(map[common.Hash]string, len(receiptsCids)) - for i, rct := range payload.Receipts { - rctCids[rct.TxHash] = receiptsCids[i] - } // Process and publish state leafs + stateLeafCids, err := pub.publishStateLeafs(payload.StateLeafs) + if err != nil { + return nil, err + } + + // Process and publish storage leafs + storageLeafCids, err := pub.publishStorageLeafs(payload.StorageLeafs) + if err != nil { + return nil, err + } + + // Package CIDs into a single struct + return &CIDPayload{ + BlockHash: payload.BlockHash.Hex(), + BlockNumber: payload.BlockNumber.String(), + HeaderCID: headerCid, + TransactionCIDs: transactionCids, + ReceiptCIDs: receiptsCids, + StateLeafCIDs: stateLeafCids, + StorageLeafCIDs: storageLeafCids, + }, nil +} + +func (pub *IPLDPublisher) publishHeaders(headerRLP []byte) (string, error) { + headerCids, err := pub.HeaderPutter.DagPut(headerRLP) + if err != nil { + return "", err + } + if len(headerCids) != 1 { + return "", errors.New("single CID expected to be returned for header") + } + return headerCids[0], nil +} + +func (pub *IPLDPublisher) publishTransactions(blockBody *types.Body) (map[common.Hash]string, error) { + transactionCids, err := pub.TransactionPutter.DagPut(blockBody) + if err != nil { + return nil, err + } + if len(transactionCids) != len(blockBody.Transactions) { + return nil, errors.New("expected one CID for each transaction") + } + mappedTrxCids := make(map[common.Hash]string, len(transactionCids)) + for i, trx := range blockBody.Transactions { + mappedTrxCids[trx.Hash()] = transactionCids[i] + } + return mappedTrxCids, nil +} + +func (pub *IPLDPublisher) publishReceipts(receipts types.Receipts) (map[common.Hash]string, error) { + receiptsCids, err := pub.ReceiptPutter.DagPut(receipts) + if err != nil { + return nil, err + } + if len(receiptsCids) != len(receipts) { + return nil, errors.New("expected one CID for each receipt") + } + mappedRctCids := make(map[common.Hash]string, len(receiptsCids)) + for i, rct := range receipts { + mappedRctCids[rct.TxHash] = receiptsCids[i] + } + return mappedRctCids, nil +} + +func (pub *IPLDPublisher) publishStateLeafs(stateLeafs map[common.Hash][]byte) (map[common.Hash]string, error) { stateLeafCids := make(map[common.Hash]string) - for addr, leaf := range payload.StateLeafs { + for addr, leaf := range stateLeafs { stateLeafCid, err := pub.StatePutter.DagPut(leaf) if err != nil { return nil, err @@ -117,10 +171,12 @@ func (pub *IPLDPublisher) Publish(payload *IPLDPayload) (*CIDPayload, error) { } stateLeafCids[addr] = stateLeafCid[0] } + return stateLeafCids, nil +} - // Process and publish storage leafs +func (pub *IPLDPublisher) publishStorageLeafs(storageLeafs map[common.Hash]map[common.Hash][]byte) (map[common.Hash]map[common.Hash]string, error) { storageLeafCids := make(map[common.Hash]map[common.Hash]string) - for addr, storageTrie := range payload.StorageLeafs { + for addr, storageTrie := range storageLeafs { storageLeafCids[addr] = make(map[common.Hash]string) for key, leaf := range storageTrie { storageLeafCid, err := pub.StoragePutter.DagPut(leaf) @@ -133,14 +189,5 @@ func (pub *IPLDPublisher) Publish(payload *IPLDPayload) (*CIDPayload, error) { storageLeafCids[addr][key] = storageLeafCid[0] } } - - return &CIDPayload{ - BlockHash: payload.BlockHash.Hex(), - BlockNumber: payload.BlockNumber.String(), - HeaderCID: headerCids[0], - TransactionCIDs: trxCids, - ReceiptCIDs: rctCids, - StateLeafCIDs: stateLeafCids, - StorageLeafCIDs: storageLeafCids, - }, nil + return storageLeafCids, nil } \ No newline at end of file diff --git a/pkg/ipfs/repository.go b/pkg/ipfs/repository.go index 42090894..a52aa9af 100644 --- a/pkg/ipfs/repository.go +++ b/pkg/ipfs/repository.go @@ -17,56 +17,72 @@ package ipfs import ( + "github.com/jmoiron/sqlx" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) +// Repository is an interface for indexing CIDPayloads type Repository interface { - IndexCIDs(cidPayload *CIDPayload) error + Index(cidPayload *CIDPayload) error } +// CIDRepository is the underlying struct for the Repository interface type CIDRepository struct { db *postgres.DB } +// NewCIDRepository creates a new pointer to a CIDRepository which satisfies the Repository interface func NewCIDRepository(db *postgres.DB) *CIDRepository { return &CIDRepository{ db: db, } } -func (repo *CIDRepository) IndexCIDs(cidPayload *CIDPayload) error { - headerID, err := repo.indexHeaderCID(cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash) +// IndexCIDs indexes a cidPayload in Postgres +func (repo *CIDRepository) Index(cidPayload *CIDPayload) error { + tx, _ := repo.db.Beginx() + headerID, err := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash) if err != nil { + tx.Rollback() return err } - err = repo.indexTransactionAndReceiptCIDs(cidPayload, headerID) + err = repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID) if err != nil { + tx.Rollback() return err } - + err = repo.indexStateAndStorageCIDs(tx, cidPayload, headerID) + if err != nil { + tx.Rollback() + return err + } + return tx.Commit() } -func (repo *CIDRepository) indexHeaderCID(cid, blockNumber, hash string) (int64, error) { +func (repo *CIDRepository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) { var headerID int64 - err := repo.db.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid) VALUES ($1, $2, $3) - RETURNING id - ON CONFLICT DO UPDATE SET cid = $3`, blockNumber, hash, cid).Scan(&headerID) + err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid) VALUES ($1, $2, $3) + ON CONFLICT DO UPDATE SET cid = $3 + RETURNING id`, + blockNumber, hash, cid).Scan(&headerID) return headerID, err } -func (repo *CIDRepository) indexTransactionAndReceiptCIDs(payload *CIDPayload, headerID int64) error { +func (repo *CIDRepository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error { for hash, trxCid := range payload.TransactionCIDs { var txID int64 - err := repo.db.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid) VALUES ($1, $2, $3) - RETURNING id - ON CONFLICT DO UPDATE SET cid = $3`, headerID, hash.Hex(), trxCid).Scan(&txID) + err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid) VALUES ($1, $2, $3) + ON CONFLICT DO UPDATE SET cid = $3 + RETURNING id`, + headerID, hash.Hex(), trxCid).Scan(&txID) if err != nil { return err } receiptCid, ok := payload.ReceiptCIDs[hash] if ok { - err = repo.indexReceiptCID(receiptCid, txID) + err = repo.indexReceiptCID(tx, receiptCid, txID) if err != nil { return err } @@ -75,23 +91,24 @@ func (repo *CIDRepository) indexTransactionAndReceiptCIDs(payload *CIDPayload, h return nil } -func (repo *CIDRepository) indexReceiptCID(cid string, txId int64) error { - _, err := repo.db.Exec(`INSERT INTO public.receipt_cids (tx_id, cid) VALUES ($1, $2) +func (repo *CIDRepository) indexReceiptCID(tx *sqlx.Tx, cid string, txId int64) error { + _, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid) VALUES ($1, $2) ON CONFLICT DO UPDATE SET cid = $2`, txId, cid) return err } -func (repo *CIDRepository) indexStateAndStorageCIDs(payload *CIDPayload, headerID int64) error { +func (repo *CIDRepository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error { for accountKey, stateCID := range payload.StateLeafCIDs { var stateID int64 - err := repo.db.QueryRowx(`INSERT INTO public.state_cids (header_id, account_key, cid) VALUES ($1, $2, $3) - RETURNING id - ON CONFLICT DO UPDATE SET cid = $3`, headerID, accountKey.Hex(), stateCID).Scan(&stateID) + err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, account_key, cid) VALUES ($1, $2, $3) + ON CONFLICT DO UPDATE SET cid = $3 + RETURNING id`, + headerID, accountKey.Hex(), stateCID).Scan(&stateID) if err != nil { return err } for storageKey, storageCID := range payload.StorageLeafCIDs[accountKey] { - err = repo.indexStorageCID(storageKey.Hex(), storageCID, stateID) + err = repo.indexStorageCID(tx, storageKey.Hex(), storageCID, stateID) if err != nil { return err } @@ -100,7 +117,7 @@ func (repo *CIDRepository) indexStateAndStorageCIDs(payload *CIDPayload, headerI return nil } -func (repo *CIDRepository) indexStorageCID(key, cid string, stateId int64) error { +func (repo *CIDRepository) indexStorageCID(tx *sqlx.Tx, key, cid string, stateId int64) error { _, err := repo.db.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid) VALUES ($1, $2, $3) ON CONFLICT DO UPDATE SET cid = $3`, stateId, key, cid) return err diff --git a/pkg/ipfs/syncer.go b/pkg/ipfs/streamer.go similarity index 60% rename from pkg/ipfs/syncer.go rename to pkg/ipfs/streamer.go index 87f6a19a..02b67305 100644 --- a/pkg/ipfs/syncer.go +++ b/pkg/ipfs/streamer.go @@ -23,22 +23,25 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/core" ) -type Syncer interface { - Sync(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) +// Streamer is the interface for streaming a statediff subscription +type Streamer interface { + Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) } -type StateDiffSyncer struct { +// StateDiffStreamer is the underlying struct for the Streamer interface +type StateDiffStreamer struct { Client core.RpcClient PayloadChan chan statediff.Payload } -func NewStateDiffSyncer(client core.RpcClient) *StateDiffSyncer { - return &StateDiffSyncer{ +// NewStateDiffStreamer creates a pointer to a new StateDiffStreamer which satisfies the Streamer interface +func NewStateDiffSyncer(client core.RpcClient) *StateDiffStreamer { + return &StateDiffStreamer{ Client: client, } } -// Sync is the main loop for subscribing to data from the Geth state diff process -func (i *StateDiffSyncer) Sync(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { +// Stream is the main loop for subscribing to data from the Geth state diff process +func (i *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { return i.Client.Subscribe("statediff", i.PayloadChan) } \ No newline at end of file