diff --git a/cmd/streamEthSubscribe.go b/cmd/streamEthSubscribe.go index 4ff3dede..586ed49f 100644 --- a/cmd/streamEthSubscribe.go +++ b/cmd/streamEthSubscribe.go @@ -133,14 +133,14 @@ func streamEthSubscription() { // This assumes leafs only for _, stateNode := range ethData.StateNodes { var acct state.Account - err = rlp.Decode(bytes.NewBuffer(stateNode.IPLD.Data), &acct) + err = rlp.DecodeBytes(stateNode.IPLD.Data, &acct) if err != nil { logWithCommand.Error(err) continue } - fmt.Printf("Account for key %s, and root %s, with balance %d\n", - stateNode.StateTrieKey.Hex(), acct.Root.Hex(), acct.Balance.Int64()) - fmt.Printf("state account: %v\n", acct) + fmt.Printf("Account for key %s, and root %s, with balance %s\n", + stateNode.StateTrieKey.Hex(), acct.Root.Hex(), acct.Balance.String()) + fmt.Printf("state account: %+v\n", acct) } for _, storageNode := range ethData.StorageNodes { fmt.Printf("Storage for state key %s ", storageNode.StateTrieKey.Hex()) diff --git a/pkg/eth/converters/common/block_rewards.go b/pkg/eth/converters/common/block_rewards.go index 668254e0..114c0c92 100644 --- a/pkg/eth/converters/common/block_rewards.go +++ b/pkg/eth/converters/common/block_rewards.go @@ -76,10 +76,10 @@ func staticRewardByBlockNumber(blockNumber int64) *big.Int { return staticBlockReward } -func CalcEthBlockReward(block *types.Block, receipts types.Receipts) *big.Int { - staticBlockReward := staticRewardByBlockNumber(block.Number().Int64()) - transactionFees := calcEthTransactionFees(block, receipts) - uncleInclusionRewards := calcEthUncleInclusionRewards(block, block.Uncles()) +func CalcEthBlockReward(header *types.Header, uncles []*types.Header, txs types.Transactions, receipts types.Receipts) *big.Int { + staticBlockReward := staticRewardByBlockNumber(header.Number.Int64()) + transactionFees := calcEthTransactionFees(txs, receipts) + uncleInclusionRewards := calcEthUncleInclusionRewards(header, uncles) tmp := transactionFees.Add(transactionFees, uncleInclusionRewards) return tmp.Add(tmp, staticBlockReward) } @@ -94,9 +94,9 @@ func CalcUncleMinerReward(blockNumber, uncleBlockNumber int64) *big.Int { return rewardDiv8.Mul(rewardDiv8, uncleBlockPlus8MinusMainBlock) } -func calcEthTransactionFees(block *types.Block, receipts types.Receipts) *big.Int { +func calcEthTransactionFees(txs types.Transactions, receipts types.Receipts) *big.Int { transactionFees := new(big.Int) - for i, transaction := range block.Transactions() { + for i, transaction := range txs { receipt := receipts[i] gasPrice := big.NewInt(transaction.GasPrice().Int64()) gasUsed := big.NewInt(int64(receipt.GasUsed)) @@ -106,10 +106,10 @@ func calcEthTransactionFees(block *types.Block, receipts types.Receipts) *big.In return transactionFees } -func calcEthUncleInclusionRewards(block *types.Block, uncles []*types.Header) *big.Int { +func calcEthUncleInclusionRewards(header *types.Header, uncles []*types.Header) *big.Int { uncleInclusionRewards := new(big.Int) for range uncles { - staticBlockReward := staticRewardByBlockNumber(block.Number().Int64()) + staticBlockReward := staticRewardByBlockNumber(header.Number.Int64()) staticBlockReward.Div(staticBlockReward, big.NewInt(32)) uncleInclusionRewards.Add(uncleInclusionRewards, staticBlockReward) } diff --git a/pkg/super_node/btc/types.go b/pkg/super_node/btc/types.go index 8e8e74b9..292984f3 100644 --- a/pkg/super_node/btc/types.go +++ b/pkg/super_node/btc/types.go @@ -41,8 +41,8 @@ type ConvertedPayload struct { } // Height satisfies the StreamedIPLDs interface -func (i ConvertedPayload) Height() int64 { - return i.BlockPayload.BlockHeight +func (cp ConvertedPayload) Height() int64 { + return cp.BlockPayload.BlockHeight } // CIDPayload is a struct to hold all the CIDs and their associated meta data for indexing in Postgres diff --git a/pkg/super_node/eth/converter.go b/pkg/super_node/eth/converter.go index 2a7979e8..7b4eb19f 100644 --- a/pkg/super_node/eth/converter.go +++ b/pkg/super_node/eth/converter.go @@ -71,8 +71,8 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert return nil, err } txMeta := TxModel{ - Dst: handleNullAddr(trx.To()), - Src: handleNullAddr(&from), + Dst: shared.HandleNullAddr(trx.To()), + Src: shared.HandleNullAddr(&from), TxHash: trx.Hash().String(), Index: int64(i), } @@ -169,10 +169,3 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert } return convertedPayload, nil } - -func handleNullAddr(to *common.Address) string { - if to == nil { - return "0x0000000000000000000000000000000000000000000000000000000000000000" - } - return to.Hex() -} diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index 8e1ccad8..01b63dd9 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -51,6 +51,7 @@ func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload sh } if checkRange(ethFilters.Start.Int64(), ethFilters.End.Int64(), ethPayload.Block.Number().Int64()) { response := new(IPLDs) + response.TotalDifficulty = ethPayload.TotalDifficulty if err := s.filterHeaders(ethFilters.HeaderFilter, response, ethPayload); err != nil { return IPLDs{}, err } diff --git a/pkg/super_node/eth/ipld_fetcher.go b/pkg/super_node/eth/ipld_fetcher.go index 4401c774..c14fc732 100644 --- a/pkg/super_node/eth/ipld_fetcher.go +++ b/pkg/super_node/eth/ipld_fetcher.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "math/big" "github.com/ethereum/go-ethereum/common" "github.com/ipfs/go-block-format" @@ -58,9 +59,13 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) } log.Debug("fetching iplds") - iplds := IPLDs{} - iplds.BlockNumber = cidWrapper.BlockNumber var err error + iplds := IPLDs{} + iplds.TotalDifficulty, ok = new(big.Int).SetString(cidWrapper.Header.TotalDifficulty, 10) + if !ok { + return nil, errors.New("eth fetcher: unable to set total difficulty") + } + iplds.BlockNumber = cidWrapper.BlockNumber iplds.Header, err = f.FetchHeader(cidWrapper.Header) if err != nil { return nil, err diff --git a/pkg/super_node/eth/ipld_fetcher_test.go b/pkg/super_node/eth/ipld_fetcher_test.go index d409862c..e76480fb 100644 --- a/pkg/super_node/eth/ipld_fetcher_test.go +++ b/pkg/super_node/eth/ipld_fetcher_test.go @@ -51,7 +51,8 @@ var ( mockCIDWrapper = ð.CIDWrapper{ BlockNumber: big.NewInt(9000), Header: eth.HeaderModel{ - CID: mockHeaderBlock.Cid().String(), + TotalDifficulty: "1337", + CID: mockHeaderBlock.Cid().String(), }, Uncles: []eth.UncleModel{ { @@ -104,6 +105,7 @@ var _ = Describe("Fetcher", func() { Expect(err).ToNot(HaveOccurred()) iplds, ok := i.(eth.IPLDs) Expect(ok).To(BeTrue()) + Expect(iplds.TotalDifficulty).To(Equal(big.NewInt(1337))) Expect(iplds.BlockNumber).To(Equal(mockCIDWrapper.BlockNumber)) Expect(iplds.Header).To(Equal(ipfs.BlockModel{ Data: mockHeaderBlock.RawData(), diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 5272daec..ce4d5da3 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -64,7 +64,7 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI if err != nil { return nil, err } - reward := common2.CalcEthBlockReward(ipldPayload.Block, ipldPayload.Receipts) + reward := common2.CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts) header := HeaderModel{ CID: headerCid, ParentHash: ipldPayload.Block.ParentHash().String(), diff --git a/pkg/super_node/eth/types.go b/pkg/super_node/eth/types.go index 67aac568..cd295368 100644 --- a/pkg/super_node/eth/types.go +++ b/pkg/super_node/eth/types.go @@ -78,13 +78,14 @@ type CIDWrapper struct { // IPLDs is used to package raw IPLD block data fetched from IPFS and returned by the server // Returned by IPLDFetcher and ResponseFilterer type IPLDs struct { - BlockNumber *big.Int - Header ipfs.BlockModel - Uncles []ipfs.BlockModel - Transactions []ipfs.BlockModel - Receipts []ipfs.BlockModel - StateNodes []StateNode - StorageNodes []StorageNode + BlockNumber *big.Int + TotalDifficulty *big.Int + Header ipfs.BlockModel + Uncles []ipfs.BlockModel + Transactions []ipfs.BlockModel + Receipts []ipfs.BlockModel + StateNodes []StateNode + StorageNodes []StorageNode } // Height satisfies the StreamedIPLDs interface diff --git a/pkg/super_node/shared/functions.go b/pkg/super_node/shared/functions.go index a4f3f3d9..ecb6ce3f 100644 --- a/pkg/super_node/shared/functions.go +++ b/pkg/super_node/shared/functions.go @@ -19,6 +19,8 @@ package shared import ( "bytes" + "github.com/ethereum/go-ethereum/common" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) @@ -51,3 +53,11 @@ func ListContainsGap(gapList []Gap, gap Gap) bool { } return false } + +// HandleNullAddr converts a nil pointer to an address to a zero-valued hex address string +func HandleNullAddr(to *common.Address) string { + if to == nil { + return "0x0000000000000000000000000000000000000000000000000000000000000000" + } + return to.Hex() +} diff --git a/pkg/wasm/instantiator.go b/pkg/wasm/instantiator.go index f45ca07c..9f9c2065 100644 --- a/pkg/wasm/instantiator.go +++ b/pkg/wasm/instantiator.go @@ -41,6 +41,7 @@ func NewWASMInstantiator(db *postgres.DB, instances []WasmFunction) *Instantiato // Instantiate is used to load the WASM functions into Postgres func (i *Instantiator) Instantiate() error { + // TODO: enable instantiation of WASM functions from IPFS tx, err := i.db.Beginx() if err != nil { return err diff --git a/pkg/watcher/btc/repository.go b/pkg/watcher/btc/repository.go new file mode 100644 index 00000000..07482785 --- /dev/null +++ b/pkg/watcher/btc/repository.go @@ -0,0 +1 @@ +package btc diff --git a/pkg/watcher/eth/converter.go b/pkg/watcher/eth/converter.go new file mode 100644 index 00000000..91a7ccf4 --- /dev/null +++ b/pkg/watcher/eth/converter.go @@ -0,0 +1,166 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package eth + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + + common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common" + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// WatcherConverter converts watched data into models for the trigger tables +type WatcherConverter struct { + chainConfig *params.ChainConfig +} + +// NewWatcherConverter creates a pointer to a new WatcherConverter +func NewWatcherConverter(chainConfig *params.ChainConfig) *WatcherConverter { + return &WatcherConverter{ + chainConfig: chainConfig, + } +} + +// Convert method is used to convert eth iplds to an cid payload +// Satisfies the shared.PayloadConverter interface +func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error) { + numTxs := len(ethIPLDs.Transactions) + numRcts := len(ethIPLDs.Receipts) + if numTxs != numRcts { + return nil, fmt.Errorf("eth converter needs same numbe of receipts and transactions, have %d transactions and %d receipts", numTxs, numRcts) + } + // Initialize the payload struct and its fields + cids := new(eth.CIDPayload) + cids.UncleCIDs = make([]eth.UncleModel, len(ethIPLDs.Uncles)) + cids.TransactionCIDs = make([]eth.TxModel, numTxs) + cids.ReceiptCIDs = make(map[common.Hash]eth.ReceiptModel, numTxs) + cids.StateNodeCIDs = make([]eth.StateNodeModel, len(ethIPLDs.StateNodes)) + cids.StorageNodeCIDs = make(map[common.Hash][]eth.StorageNodeModel, len(ethIPLDs.StateNodes)) + + // Unpack header + var header types.Header + if err := rlp.DecodeBytes(ethIPLDs.Header.Data, &header); err != nil { + return nil, err + } + // Collect uncles so we can derive miner reward + uncles := make([]*types.Header, len(ethIPLDs.Uncles)) + for i, uncleIPLD := range ethIPLDs.Uncles { + var uncle types.Header + if err := rlp.DecodeBytes(uncleIPLD.Data, &uncle); err != nil { + return nil, err + } + uncleReward := common2.CalcUncleMinerReward(header.Number.Int64(), uncle.Number.Int64()) + uncles[i] = &uncle + // Uncle data + cids.UncleCIDs[i] = eth.UncleModel{ + CID: uncleIPLD.CID, + BlockHash: uncle.Hash().String(), + ParentHash: uncle.ParentHash.String(), + Reward: uncleReward.String(), + } + } + // Collect transactions so we can derive receipt fields and miner reward + signer := types.MakeSigner(pc.chainConfig, header.Number) + transactions := make(types.Transactions, len(ethIPLDs.Transactions)) + for i, txIPLD := range ethIPLDs.Transactions { + var tx types.Transaction + if err := rlp.DecodeBytes(txIPLD.Data, &tx); err != nil { + return nil, err + } + transactions[i] = &tx + from, err := types.Sender(signer, &tx) + if err != nil { + return nil, err + } + // Tx data + cids.TransactionCIDs[i] = eth.TxModel{ + Dst: shared.HandleNullAddr(tx.To()), + Src: shared.HandleNullAddr(&from), + TxHash: tx.Hash().String(), + Index: int64(i), + CID: txIPLD.CID, + } + } + // Collect receipts so that we can derive the rest of their fields and miner reward + receipts := make(types.Receipts, len(ethIPLDs.Receipts)) + for i, rctIPLD := range ethIPLDs.Receipts { + var rct types.Receipt + if err := rlp.DecodeBytes(rctIPLD.Data, &rct); err != nil { + return nil, err + } + receipts[i] = &rct + } + if err := receipts.DeriveFields(pc.chainConfig, header.Hash(), header.Number.Uint64(), transactions); err != nil { + return nil, err + } + for i, receipt := range receipts { + matchedTx := transactions[i] + if matchedTx.To() != nil { + receipt.ContractAddress = *transactions[i].To() + } + topicSets := make([][]string, 4) + for _, log := range receipt.Logs { + for i := range topicSets { + if i < len(log.Topics) { + topicSets[i] = append(topicSets[i], log.Topics[i].Hex()) + } + } + } + // Rct data + cids.ReceiptCIDs[matchedTx.Hash()] = eth.ReceiptModel{ + CID: ethIPLDs.Receipts[i].CID, + Topic0s: topicSets[0], + Topic1s: topicSets[1], + Topic2s: topicSets[2], + Topic3s: topicSets[3], + Contract: receipt.ContractAddress.Hex(), + } + } + minerReward := common2.CalcEthBlockReward(&header, uncles, transactions, receipts) + // Header data + cids.HeaderCID = eth.HeaderModel{ + CID: ethIPLDs.Header.CID, + ParentHash: header.ParentHash.String(), + BlockHash: header.Hash().String(), + BlockNumber: header.Number.String(), + TotalDifficulty: ethIPLDs.TotalDifficulty.String(), + Reward: minerReward.String(), + } + // State data + for i, stateIPLD := range ethIPLDs.StateNodes { + cids.StateNodeCIDs[i] = eth.StateNodeModel{ + CID: stateIPLD.IPLD.CID, + Leaf: stateIPLD.Leaf, + StateKey: stateIPLD.StateTrieKey.String(), + } + } + // Storage data + for _, storageIPLD := range ethIPLDs.StorageNodes { + cids.StorageNodeCIDs[storageIPLD.StateTrieKey] = append(cids.StorageNodeCIDs[storageIPLD.StateTrieKey], eth.StorageNodeModel{ + CID: storageIPLD.IPLD.CID, + Leaf: storageIPLD.Leaf, + StorageKey: storageIPLD.StorageTrieKey.String(), + }) + } + return cids, nil +} diff --git a/pkg/watcher/eth/repository.go b/pkg/watcher/eth/repository.go index 48a55d46..cde4255d 100644 --- a/pkg/watcher/eth/repository.go +++ b/pkg/watcher/eth/repository.go @@ -17,20 +17,25 @@ package eth import ( - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" + "io/ioutil" + + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/watcher/shared" ) var ( - vacuumThreshold int64 = 5000 // dont know how to decided what this should be set to + vacuumThreshold int64 = 5000 ) // Repository is the underlying struct for satisfying the shared.Repository interface for eth type Repository struct { + cidIndexer *eth.CIDIndexer + converter *WatcherConverter db *postgres.DB triggerFunctions []string deleteCalls int64 @@ -39,6 +44,8 @@ type Repository struct { // NewRepository returns a new eth.Repository that satisfies the shared.Repository interface func NewRepository(db *postgres.DB, triggerFunctions []string) shared.Repository { return &Repository{ + cidIndexer: eth.NewCIDIndexer(db), + converter: NewWatcherConverter(params.MainnetChainConfig), db: db, triggerFunctions: triggerFunctions, deleteCalls: 0, @@ -46,9 +53,25 @@ func NewRepository(db *postgres.DB, triggerFunctions []string) shared.Repository } // LoadTriggers is used to initialize Postgres trigger function -// this needs to be called after the wasm functions these triggers invoke have been instantiated +// this needs to be called after the wasm functions these triggers invoke have been instantiated in Postgres func (r *Repository) LoadTriggers() error { - panic("implement me") + // TODO: enable loading of triggers from IPFS + tx, err := r.db.Beginx() + if err != nil { + return err + } + for _, funcPath := range r.triggerFunctions { + sqlFile, err := ioutil.ReadFile(funcPath) + if err != nil { + return err + } + sqlString := string(sqlFile) + if _, err := tx.Exec(sqlString); err != nil { + return err + } + + } + return tx.Commit() } // QueueData puts super node payload data into the db queue @@ -59,14 +82,14 @@ func (r *Repository) QueueData(payload super_node.SubscriptionPayload) error { return err } -// GetQueueData grabs a chunk super node payload data from the queue table so that it can -// be forwarded to the ready table -// this is used to make sure we enter data into the ready table in sequential order +// GetQueueData grabs payload data from the queue table so that it can +// be forwarded to the ready tables +// this is used to make sure we enter data into the tables that triggers act on in sequential order // even if we receive data out-of-order // it returns the new index // delete the data it retrieves so as to clear the queue +// periodically vacuum's the table to free up space from the deleted rows func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload, int64, error) { - r.deleteCalls++ pgStr := `DELETE FROM eth.queued_data WHERE height = $1 RETURNING *` @@ -74,13 +97,15 @@ func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload, if err := r.db.Get(&res, pgStr, height); err != nil { return super_node.SubscriptionPayload{}, height, err } + // If the delete get query succeeded, increment deleteCalls and height and prep payload to return + r.deleteCalls++ + height++ payload := super_node.SubscriptionPayload{ Data: res.Data, Height: res.Height, Flag: super_node.EmptyFlag, } - height++ - // Periodically clean up space in the queue table + // Periodically clean up space in the queued data table if r.deleteCalls >= vacuumThreshold { _, err := r.db.Exec(`VACUUM ANALYZE eth.queued_data`) if err != nil { @@ -91,31 +116,77 @@ func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload, return payload, height, nil } -// ReadyData puts super node payload data in the tables ready for processing by trigger functions +// ReadyData puts data in the tables ready for processing by trigger functions func (r *Repository) ReadyData(payload super_node.SubscriptionPayload) error { - panic("implement me") + var ethIPLDs eth.IPLDs + if err := rlp.DecodeBytes(payload.Data, ðIPLDs); err != nil { + return err + } + if err := r.readyIPLDs(ethIPLDs); err != nil { + return err + } + cids, err := r.converter.Convert(ethIPLDs) + if err != nil { + return err + } + // Use indexer to persist all of the cid meta data + // trigger functions will act on these tables + return r.cidIndexer.Index(cids) } -func (r *Repository) readyHeader(header *types.Header) error { - panic("implement me") -} - -func (r *Repository) readyUncle(uncle *types.Header) error { - panic("implement me") -} - -func (r *Repository) readyTxs(transactions types.Transactions) error { - panic("implement me") -} - -func (r *Repository) readyRcts(receipts types.Receipts) error { - panic("implement me") -} - -func (r *Repository) readyState(stateNodes map[common.Address][]byte) error { - panic("implement me") -} - -func (r *Repository) readyStorage(storageNodes map[common.Address]map[common.Address][]byte) error { - panic("implement me") +// readyIPLDs adds IPLDs directly to the Postgres `blocks` table, rather than going through an IPFS node +func (r *Repository) readyIPLDs(ethIPLDs eth.IPLDs) error { + tx, err := r.db.Beginx() + if err != nil { + return err + } + pgStr := `INSERT INTO blocks (key, data) VALUES ($1, $2) + ON CONFLICT (key) DO UPDATE SET (data) = ($2)` + if _, err := tx.Exec(pgStr, ethIPLDs.Header.CID, ethIPLDs.Header.Data); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + return err + } + for _, uncle := range ethIPLDs.Uncles { + if _, err := tx.Exec(pgStr, uncle.CID, uncle.Data); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + return err + } + } + for _, trx := range ethIPLDs.Transactions { + if _, err := tx.Exec(pgStr, trx.CID, trx.Data); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + return err + } + } + for _, rct := range ethIPLDs.Receipts { + if _, err := tx.Exec(pgStr, rct.CID, rct.Data); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + return err + } + } + for _, state := range ethIPLDs.StateNodes { + if _, err := tx.Exec(pgStr, state.IPLD.CID, state.IPLD.Data); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + return err + } + } + for _, storage := range ethIPLDs.StorageNodes { + if _, err := tx.Exec(pgStr, storage.IPLD.CID, storage.IPLD.Data); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + return err + } + } + return nil } diff --git a/pkg/watcher/example/sql/transfer_table.sql b/pkg/watcher/example/sql/transfer_table.sql new file mode 100644 index 00000000..895b6a2a --- /dev/null +++ b/pkg/watcher/example/sql/transfer_table.sql @@ -0,0 +1,10 @@ +CREATE TABLE eth.token_transfers ( + id SERIAL PRIMARY KEY, + receipt_id INTEGER NOT NULL REFERENCES eth.receipt_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, + log_index INTEGER NOT NULL, + contract_address VARCHAR(66) NOT NULL, + src VARCHAR(66) NOT NULL, + dst VARCHAR(66) NOT NULL, + amount NUMERIC NOT NULL, + UNIQUE (receipt_id, log_index) +); \ No newline at end of file diff --git a/pkg/watcher/example/sql/transfer_trigger.sql b/pkg/watcher/example/sql/transfer_trigger.sql new file mode 100644 index 00000000..bb3700b7 --- /dev/null +++ b/pkg/watcher/example/sql/transfer_trigger.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION transfer_trigger() RETURNS trigger AS +$BODY$ +BEGIN + SELECT * + +END; +$BODY$ \ No newline at end of file diff --git a/pkg/watcher/service.go b/pkg/watcher/service.go index a67d9615..8081f6a9 100644 --- a/pkg/watcher/service.go +++ b/pkg/watcher/service.go @@ -53,10 +53,8 @@ type Service struct { QuitChan chan bool // Indexes - // use atomic operations on these ONLY payloadIndex *int64 - endingIndex *int64 - backFilling *int32 // 0 => not backfilling; 1 => backfilling + endingIndex int64 } // NewWatcher returns a new Service which satisfies the Watcher interface @@ -100,13 +98,7 @@ func (s *Service) Watch(wg *sync.WaitGroup) error { return err } atomic.StoreInt64(s.payloadIndex, s.WatcherConfig.SubscriptionConfig.StartingBlock().Int64()) - atomic.StoreInt64(s.endingIndex, s.WatcherConfig.SubscriptionConfig.EndingBlock().Int64()) // less than 0 => never end - backFilling := s.WatcherConfig.SubscriptionConfig.HistoricalData() - if backFilling { - atomic.StoreInt32(s.backFilling, 1) - } else { - atomic.StoreInt32(s.backFilling, 0) - } + s.endingIndex = s.WatcherConfig.SubscriptionConfig.EndingBlock().Int64() // less than 0 => never end backFillOnly := s.WatcherConfig.SubscriptionConfig.HistoricalDataOnly() if backFillOnly { // we are only processing historical data => handle single contiguous stream s.backFillOnlyQueuing(wg, sub) @@ -123,7 +115,7 @@ func (s *Service) Watch(wg *sync.WaitGroup) error { // NOTE: maybe we should push everything to the wait queue, otherwise the index could be shifted as we retrieve data from it func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) { wg.Add(1) - // this goroutine is responsible for allocating incoming data to the ready or wait queue + // This goroutine is responsible for allocating incoming data to the ready or wait queue // depending on if it is at the current index or not forwardQuit := make(chan bool) go func() { @@ -141,8 +133,14 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio if err := s.Repository.ReadyData(payload); err != nil { logrus.Error(err) } - atomic.AddInt64(s.payloadIndex, 1) - } else { // otherwise add it to the wait queue + // Increment the current index and if we have exceeded our ending height shut down the watcher + if atomic.AddInt64(s.payloadIndex, 1) > s.endingIndex { + logrus.Info("Watcher has reached ending block height, shutting down") + forwardQuit <- true + wg.Done() + return + } + } else { // Otherwise add it to the wait queue if err := s.Repository.QueueData(payload); err != nil { logrus.Error(err) } @@ -158,27 +156,29 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio } }() ticker := time.NewTicker(5 * time.Second) - // this goroutine is responsible for moving data from the wait queue to the ready queue + // This goroutine is responsible for moving data from the wait queue to the ready queue // preserving the correct order and alignment with the current index go func() { for { select { case <-ticker.C: - // retrieve queued data, in order, and forward it to the ready queue - index := atomic.LoadInt64(s.payloadIndex) - queueData, newIndex, err := s.Repository.GetQueueData(index) + // Retrieve queued data, in order, and forward it to the ready queue + queueData, newIndex, err := s.Repository.GetQueueData(atomic.LoadInt64(s.payloadIndex)) if err != nil { logrus.Error(err) continue } atomic.StoreInt64(s.payloadIndex, newIndex) + if atomic.LoadInt64(s.payloadIndex) > s.endingIndex { + s.QuitChan <- true + } if err := s.Repository.ReadyData(queueData); err != nil { logrus.Error(err) } case <-forwardQuit: return default: - // do nothing, wait til next tick + // Do nothing, wait til next tick } } }()