From 1dc46640cfe3e6a321d6b0e7057ce1e387f7810b Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 21 May 2019 14:27:24 -0500 Subject: [PATCH] request handler and response screener for seed node api service --- .../00028_create_header_cids_table.sql | 2 +- pkg/ipfs/api.go | 25 +- pkg/ipfs/api_test.go | 1 + pkg/ipfs/converter.go | 4 +- pkg/ipfs/fetcher.go | 154 ++++++++++++- pkg/ipfs/helpers.go | 13 ++ pkg/ipfs/publisher.go | 2 +- pkg/ipfs/repository.go | 19 +- pkg/ipfs/resolver.go | 86 +++++++ pkg/ipfs/retreiver.go | 195 ++++++++++++++++ pkg/ipfs/retreiver_test.go | 1 + pkg/ipfs/screener.go | 218 ++++++++++++++++++ pkg/ipfs/screener_test.go | 1 + pkg/ipfs/service.go | 124 +++++++--- pkg/ipfs/test_helpers/mocks/api.go | 1 + pkg/ipfs/test_helpers/mocks/screener.go | 1 + pkg/ipfs/test_helpers/test_data.go | 22 +- pkg/ipfs/types.go | 106 +++++---- 18 files changed, 862 insertions(+), 113 deletions(-) create mode 100644 pkg/ipfs/api_test.go create mode 100644 pkg/ipfs/resolver.go create mode 100644 pkg/ipfs/retreiver_test.go create mode 100644 pkg/ipfs/screener.go create mode 100644 pkg/ipfs/screener_test.go create mode 100644 pkg/ipfs/test_helpers/mocks/api.go create mode 100644 pkg/ipfs/test_helpers/mocks/screener.go diff --git a/db/migrations/00028_create_header_cids_table.sql b/db/migrations/00028_create_header_cids_table.sql index 8d63f299..331867c1 100644 --- a/db/migrations/00028_create_header_cids_table.sql +++ b/db/migrations/00028_create_header_cids_table.sql @@ -4,7 +4,7 @@ CREATE TABLE public.header_cids ( block_number BIGINT NOT NULL, block_hash VARCHAR(66) NOT NULL, cid TEXT NOT NULL, - uncle BOOLEAN NOT NULL, + final BOOLEAN NOT NULL, UNIQUE (block_number, block_hash) ); diff --git a/pkg/ipfs/api.go b/pkg/ipfs/api.go index 15413678..e32203b5 100644 --- a/pkg/ipfs/api.go +++ b/pkg/ipfs/api.go @@ -18,7 +18,6 @@ package ipfs import ( "context" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -29,26 +28,36 @@ const APIName = "vulcanizedb" // APIVersion is the version of the state diffing service API const APIVersion = "0.0.1" -// PublicSeedNodeAPI +// PublicSeedNodeAPI is the public api for the seed node type PublicSeedNodeAPI struct { - snp SyncPublishAndServe + snp SyncPublishScreenAndServe } -// NewPublicSeedNodeAPI -func NewPublicSeedNodeAPI(snp SyncPublishAndServe) *PublicSeedNodeAPI { +// NewPublicSeedNodeAPI creates a new PublicSeedNodeAPI with the provided underlying SyncPublishScreenAndServe process +func NewPublicSeedNodeAPI(snp SyncPublishScreenAndServe) *PublicSeedNodeAPI { return &PublicSeedNodeAPI{ snp: snp, } } // Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created -func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChan chan ResponsePayload, params *Params) (*rpc.Subscription, error) { +func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChanForTypeDefOnly chan ResponsePayload) (*rpc.Subscription, error) { // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, rpc.ErrNotificationsUnsupported } + streamFilters := StreamFilters{} + streamFilters.HeaderFilter.FinalOnly = true + streamFilters.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} + streamFilters.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} + streamFilters.ReceiptFilter.Topic0s = []string{ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377", + } + streamFilters.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"} + streamFilters.StorageFilter.Off = true // create subscription and start waiting for statediff events rpcSub := notifier.CreateSubscription() @@ -56,7 +65,7 @@ func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChan chan Re // subscribe to events from the state diff service payloadChannel := make(chan ResponsePayload) quitChan := make(chan bool) - api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, params) + go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, &streamFilters) // loop and await state diff payloads and relay them to the subscriber with then notifier for { @@ -79,4 +88,4 @@ func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChan chan Re }() return rpcSub, nil -} +} \ No newline at end of file diff --git a/pkg/ipfs/api_test.go b/pkg/ipfs/api_test.go new file mode 100644 index 00000000..a0c59fc2 --- /dev/null +++ b/pkg/ipfs/api_test.go @@ -0,0 +1 @@ +package ipfs diff --git a/pkg/ipfs/converter.go b/pkg/ipfs/converter.go index fe349ca0..82609795 100644 --- a/pkg/ipfs/converter.go +++ b/pkg/ipfs/converter.go @@ -73,8 +73,8 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { return nil, err } txMeta := &TrxMetaData{ - To: handleNullAddr(trx.To()), - From: from.Hex(), + Dst: handleNullAddr(trx.To()), + Src: from.Hex(), } // txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody convertedPayload.TrxMetaData = append(convertedPayload.TrxMetaData, txMeta) diff --git a/pkg/ipfs/fetcher.go b/pkg/ipfs/fetcher.go index bdadfb57..6b6a9411 100644 --- a/pkg/ipfs/fetcher.go +++ b/pkg/ipfs/fetcher.go @@ -19,36 +19,174 @@ package ipfs import ( "context" + "github.com/ethereum/go-ethereum/common" + "github.com/ipfs/go-block-format" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" + log "github.com/sirupsen/logrus" ) -// IPLDFetcher is the underlying struct which supports a IPLD fetching interface -type IPLDFetcher struct { +// IPLDFethcer is an interface for fetching IPLDs +type IPLDFetcher interface { + FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error) +} + +// EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS +type EthIPLDFetcher struct { BlockService blockservice.BlockService } // NewIPLDFetcher creates a pointer to a new IPLDFetcher -func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) { +func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) { blockService, err := InitIPFSBlockService(ipfsPath) if err != nil { return nil, err } - return &IPLDFetcher{ + return &EthIPLDFetcher{ BlockService: blockService, }, nil } -// Fetch is used to fetch a single block of IPFS data by cid -func (f *IPLDFetcher) Fetch(cid cid.Cid) (blocks.Block, error) { +// FetchCIDs is the exported method for fetching and returning all the cids passed in a cidWrapper +func (f *EthIPLDFetcher) FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error) { + blocks := &ipfsBlockWrapper{ + Headers: make([]blocks.Block, 0), + Transactions: make([]blocks.Block, 0), + Receipts: make([]blocks.Block, 0), + StateNodes: make(map[common.Hash]blocks.Block), + StorageNodes: make(map[common.Hash]map[common.Hash]blocks.Block), + } + + err := f.fetchHeaders(cids, blocks) + if err != nil { + return nil, err + } + err = f.fetchTrxs(cids, blocks) + if err != nil { + return nil, err + } + err = f.fetchRcts(cids, blocks) + if err != nil { + return nil, err + } + err = f.fetchStorage(cids, blocks) + if err != nil { + return nil, err + } + err = f.fetchState(cids, blocks) + if err != nil { + return nil, err + } + + return blocks, nil +} + +// fetchHeaders fetches headers +// It uses the f.fetchBatch method +func (f *EthIPLDFetcher) fetchHeaders(cids cidWrapper, blocks *ipfsBlockWrapper) error { + headerCids := make([]cid.Cid, 0, len(cids.Headers)) + for _, c := range cids.Headers { + dc, err := cid.Decode(c) + if err != nil { + return err + } + headerCids = append(headerCids, dc) + } + blocks.Headers = f.fetchBatch(headerCids) + if len(blocks.Headers) != len(headerCids) { + log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(blocks.Headers), len(headerCids)) + } + return nil +} + +// fetchTrxs fetches transactions +// It uses the f.fetchBatch method +func (f *EthIPLDFetcher) fetchTrxs(cids cidWrapper, blocks *ipfsBlockWrapper) error { + trxCids := make([]cid.Cid, 0, len(cids.Transactions)) + for _, c := range cids.Transactions { + dc, err := cid.Decode(c) + if err != nil { + return err + } + trxCids = append(trxCids, dc) + } + blocks.Transactions = f.fetchBatch(trxCids) + if len(blocks.Transactions) != len(trxCids) { + log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(blocks.Transactions), len(trxCids)) + } + return nil +} + +// fetchRcts fetches receipts +// It uses the f.fetchBatch method +func (f *EthIPLDFetcher) fetchRcts(cids cidWrapper, blocks *ipfsBlockWrapper) error { + rctCids := make([]cid.Cid, 0, len(cids.Receipts)) + for _, c := range cids.Receipts { + dc, err := cid.Decode(c) + if err != nil { + return err + } + rctCids = append(rctCids, dc) + } + blocks.Receipts = f.fetchBatch(rctCids) + if len(blocks.Receipts) != len(rctCids) { + log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(blocks.Receipts), len(rctCids)) + } + return nil +} + +// fetchState fetches state nodes +// It uses the single f.fetch method instead of the batch fetch, because it +// needs to maintain the data's relation to state keys +func (f *EthIPLDFetcher) fetchState(cids cidWrapper, blocks *ipfsBlockWrapper) error { + for _, stateNode := range cids.StateNodes { + if stateNode.CID == "" || stateNode.Key == "" { + continue + } + dc, err := cid.Decode(stateNode.CID) + if err != nil { + return err + } + block, err := f.fetch(dc) + if err != nil { + return err + } + blocks.StateNodes[common.HexToHash(stateNode.Key)] = block + } + return nil +} + +// fetchStorage fetches storage nodes +// It uses the single f.fetch method instead of the batch fetch, because it +// needs to maintain the data's relation to state and storage keys +func (f *EthIPLDFetcher) fetchStorage(cids cidWrapper, blocks *ipfsBlockWrapper) error { + for _, storageNode := range cids.StorageNodes { + if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" { + continue + } + dc, err := cid.Decode(storageNode.CID) + if err != nil { + return err + } + block, err := f.fetch(dc) + if err != nil { + return err + } + blocks.StorageNodes[common.HexToHash(storageNode.StateKey)][common.HexToHash(storageNode.Key)] = block + } + return nil +} + +// fetch is used to fetch a single cid +func (f *EthIPLDFetcher) fetch(cid cid.Cid) (blocks.Block, error) { return f.BlockService.GetBlock(context.Background(), cid) } -// FetchBatch is used to fetch a batch of IPFS data blocks by cid +// fetchBatch is used to fetch a batch of IPFS data blocks by cid // There is no guarantee all are fetched, and no error in such a case, so // downstream we will need to confirm which CIDs were fetched in the result set -func (f *IPLDFetcher) FetchBatch(cids []cid.Cid) []blocks.Block { +func (f *EthIPLDFetcher) fetchBatch(cids []cid.Cid) []blocks.Block { fetchedBlocks := make([]blocks.Block, 0, len(cids)) blockChan := f.BlockService.GetBlocks(context.Background(), cids) for block := range blockChan { diff --git a/pkg/ipfs/helpers.go b/pkg/ipfs/helpers.go index de3c8c90..c2b8e15c 100644 --- a/pkg/ipfs/helpers.go +++ b/pkg/ipfs/helpers.go @@ -19,6 +19,8 @@ package ipfs import ( "context" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/repo/fsrepo" @@ -41,3 +43,14 @@ func InitIPFSBlockService(ipfsPath string) (blockservice.BlockService, error) { } return ipfsNode.Blocks, nil } + +// AddressToKey hashes an address +func AddressToKey(address common.Address) common.Hash { + return crypto.Keccak256Hash(address[:]) +} + +// HexToKey hashes a hex (0x leading) string +func HexToKey(hex string) common.Hash { + addr := common.HexToAddress(hex) + return crypto.Keccak256Hash(addr[:]) +} diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index 7f9b4d99..f2490040 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -209,7 +209,7 @@ func (pub *Publisher) publishStorageNodes(storageNodes map[common.Hash][]Storage return nil, errors.New("single CID expected to be returned for storage leaf") } storageLeafCids[addr] = append(storageLeafCids[addr], StorageNodeCID{ - Key: node.Key, + Key: node.Key.Hex(), CID: storageNodeCid[0], Leaf: node.Leaf, }) diff --git a/pkg/ipfs/repository.go b/pkg/ipfs/repository.go index 1729ffa2..474e2921 100644 --- a/pkg/ipfs/repository.go +++ b/pkg/ipfs/repository.go @@ -14,7 +14,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// Still seeing some errors from tx and storage indexing processes... due to fk constraints being broken package ipfs import ( @@ -24,6 +23,8 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) +// Still seeing some errors from tx and storage indexing processes... due to fk constraints being broken + // CIDRepository is an interface for indexing CIDPayloads type CIDRepository interface { Index(cidPayload *CIDPayload) error @@ -70,17 +71,17 @@ func (repo *Repository) Index(cidPayload *CIDPayload) error { func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) { var headerID int64 - err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4) + err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4) + ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, final) = ($3, $4) RETURNING id`, - blockNumber, hash, cid, false).Scan(&headerID) + blockNumber, hash, cid, true).Scan(&headerID) return headerID, err } func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string) error { - _, err := tx.Queryx(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4)`, - blockNumber, hash, cid, true) + _, err := tx.Queryx(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4) + ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, final) = ($3, $4)`, + blockNumber, hash, cid, false) return err } @@ -91,7 +92,7 @@ func (repo *Repository) indexTransactionAndReceiptCIDs(payload *CIDPayload, head err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src) = ($3, $4, $5) RETURNING id`, - headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.To, trxCidMeta.From).Scan(&txID) + headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src).Scan(&txID) if err != nil { tx.Rollback() return err @@ -140,6 +141,6 @@ func (repo *Repository) indexStateAndStorageCIDs(payload *CIDPayload, headerID i func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID, stateID int64) error { _, err := repo.db.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid, leaf) VALUES ($1, $2, $3, $4) ON CONFLICT (state_id, storage_key) DO UPDATE SET (cid, leaf) = ($3, $4)`, - stateID, storageCID.Key.Hex(), storageCID.CID, storageCID.Leaf) + stateID, storageCID.Key, storageCID.CID, storageCID.Leaf) return err } diff --git a/pkg/ipfs/resolver.go b/pkg/ipfs/resolver.go new file mode 100644 index 00000000..e0352b15 --- /dev/null +++ b/pkg/ipfs/resolver.go @@ -0,0 +1,86 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipfs + +import ( + "github.com/ethereum/go-ethereum/common" + + "github.com/ipfs/go-block-format" +) + +type IPLDResolver interface { + ResolveIPLDs(ipfsBlocks ipfsBlockWrapper) (*ResponsePayload, error) +} + +type EthIPLDResolver struct{} + +func NewIPLDResolver() *EthIPLDResolver { + return &EthIPLDResolver{} +} + +func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks ipfsBlockWrapper) (*ResponsePayload, error) { + response := new(ResponsePayload) + eir.resolveHeaders(ipfsBlocks.Headers, response) + eir.resolveTransactions(ipfsBlocks.Transactions, response) + eir.resolveReceipts(ipfsBlocks.Receipts, response) + eir.resolveState(ipfsBlocks.StateNodes, response) + eir.resolveStorage(ipfsBlocks.StorageNodes, response) + return response, nil +} + +func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *ResponsePayload) { + for _, block := range blocks { + raw := block.RawData() + response.HeadersRlp = append(response.HeadersRlp, raw) + } +} + +func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *ResponsePayload) { + for _, block := range blocks { + raw := block.RawData() + response.TransactionsRlp = append(response.TransactionsRlp, raw) + } +} + +func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *ResponsePayload) { + for _, block := range blocks { + raw := block.RawData() + response.ReceiptsRlp = append(response.ReceiptsRlp, raw) + } +} + +func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *ResponsePayload) { + if response.StateNodesRlp == nil { + response.StateNodesRlp = make(map[common.Hash][]byte) + } + for key, block := range blocks { + raw := block.RawData() + response.StateNodesRlp[key] = raw + } +} + +func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *ResponsePayload) { + if response.StateNodesRlp == nil { + response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) + } + for stateKey, storageBlocks := range blocks { + for storageKey, storageVal := range storageBlocks { + raw := storageVal.RawData() + response.StorageNodesRlp[stateKey][storageKey] = raw + } + } +} diff --git a/pkg/ipfs/retreiver.go b/pkg/ipfs/retreiver.go index 63fc346e..02f805b9 100644 --- a/pkg/ipfs/retreiver.go +++ b/pkg/ipfs/retreiver.go @@ -15,3 +15,198 @@ // along with this program. If not, see . package ipfs + +import ( + "github.com/jmoiron/sqlx" + "github.com/lib/pq" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +type CIDRetriever interface { + RetrieveCIDs(streamFilters StreamFilters) ([]cidWrapper, error) +} + +type EthCIDRetriever struct { + db *postgres.DB +} + +func NewCIDRetriever(db *postgres.DB) *EthCIDRetriever { + return &EthCIDRetriever{ + db: db, + } +} + +func (ecr *EthCIDRetriever) GetLastBlockNumber() (int64, error) { + var blockNumber int64 + err := ecr.db.Get(&blockNumber, "SELECT block_number FROM header_cids ORDER BY block_number DESC LIMIT 1 ") + return blockNumber, err +} +func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]cidWrapper, error) { + var endingBlock int64 + var err error + if streamFilters.EndingBlock <= 0 || streamFilters.EndingBlock <= streamFilters.StartingBlock { + endingBlock, err = ecr.GetLastBlockNumber() + if err != nil { + return nil, err + } + } + cids := make([]cidWrapper, 0, endingBlock+1-streamFilters.StartingBlock) + tx, err := ecr.db.Beginx() + if err != nil { + return nil, err + } + for i := streamFilters.StartingBlock; i <= endingBlock; i++ { + cw := &cidWrapper{ + BlockNumber: i, + Headers: make([]string, 0), + Transactions: make([]string, 0), + Receipts: make([]string, 0), + StateNodes: make([]StateNodeCID, 0), + StorageNodes: make([]StorageNodeCID, 0), + } + if !streamFilters.HeaderFilter.Off { + err = ecr.retrieveHeaderCIDs(tx, streamFilters, cw, i) + if err != nil { + tx.Rollback() + return nil, err + } + } + var trxIds []int64 + if !streamFilters.TrxFilter.Off { + trxIds, err = ecr.retrieveTrxCIDs(tx, streamFilters, cw, i) + if err != nil { + tx.Rollback() + return nil, err + } + } + if !streamFilters.ReceiptFilter.Off { + err = ecr.retrieveRctCIDs(tx, streamFilters, cw, i, trxIds) + if err != nil { + tx.Rollback() + return nil, err + } + } + if !streamFilters.StateFilter.Off { + err = ecr.retrieveStateCIDs(tx, streamFilters, cw, i) + if err != nil { + tx.Rollback() + return nil, err + } + } + if !streamFilters.StorageFilter.Off { + err = ecr.retrieveStorageCIDs(tx, streamFilters, cw, i) + if err != nil { + tx.Rollback() + return nil, err + } + } + cids = append(cids, *cw) + } + + return cids, err +} + +func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error { + var pgStr string + if streamFilters.HeaderFilter.FinalOnly { + pgStr = `SELECT cid FROM header_cids + WHERE block_number = $1 + AND final IS TRUE` + } else { + pgStr = `SELECT cid FROM header_cids + WHERE block_number = $1` + } + return tx.Select(cids.Headers, pgStr, blockNumber) +} + +func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) ([]int64, error) { + args := make([]interface{}, 0, 3) + type result struct { + Id int64 `db:"id"` + Cid string `db:"cid"` + } + results := make([]result, 0) + pgStr := `SELECT transaction_cids.id, transaction_cids.cid FROM transaction_cids INNER JOIN header_cids ON (transaction_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1` + args = append(args, blockNumber) + if len(streamFilters.TrxFilter.Dst) > 0 { + pgStr += ` AND transaction_cids.dst = ANY($2::VARCHAR(66)[])` + args = append(args, pq.Array(streamFilters.TrxFilter.Dst)) + } + if len(streamFilters.TrxFilter.Src) > 0 { + pgStr += ` AND transaction_cids.src = ANY($3::VARCHAR(66)[])` + args = append(args, pq.Array(streamFilters.TrxFilter.Src)) + } + err := tx.Select(results, pgStr, args...) + if err != nil { + return nil, err + } + ids := make([]int64, 0) + for _, res := range results { + cids.Transactions = append(cids.Transactions, res.Cid) + ids = append(ids, res.Id) + } + return ids, nil +} + +func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64, trxIds []int64) error { + args := make([]interface{}, 0, 2) + pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids + WHERE receipt_cids.tx_id = transaction_cids.id + AND transaction_cids.header_id = header_cids.id + AND header_cids.block_number = $1` + args = append(args, blockNumber) + if len(streamFilters.ReceiptFilter.Topic0s) > 0 { + pgStr += ` AND (receipt_cids.topic0s && $2::VARCHAR(66)[]` + args = append(args, pq.Array(streamFilters.ReceiptFilter.Topic0s)) + } + if len(trxIds) > 0 { + pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))` + args = append(args, pq.Array(trxIds)) + } else { + pgStr += `)` + } + return tx.Select(cids.Receipts, pgStr, args...) +} + +func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error { + args := make([]interface{}, 0, 2) + pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1` + args = append(args, blockNumber) + addrLen := len(streamFilters.StateFilter.Addresses) + if addrLen > 0 { + keys := make([]string, 0, addrLen) + for _, addr := range streamFilters.StateFilter.Addresses { + keys = append(keys, HexToKey(addr).Hex()) + } + pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` + args = append(args, pq.Array(keys)) + } + return tx.Select(cids.StateNodes, pgStr, args...) +} + +func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error { + args := make([]interface{}, 0, 3) + pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids + WHERE storage_cids.state_id = state_cids.id + AND state_cids.header_id = header_cids.id + AND header_cids.block_number = $1` + args = append(args, blockNumber) + addrLen := len(streamFilters.StorageFilter.Addresses) + if addrLen > 0 { + keys := make([]string, 0, addrLen) + for _, addr := range streamFilters.StorageFilter.Addresses { + keys = append(keys, HexToKey(addr).Hex()) + } + pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` + args = append(args, pq.Array(keys)) + } + if len(streamFilters.StorageFilter.StorageKeys) > 0 { + pgStr += ` AND storage_cids.storage_key = ANY($3::VARCHAR(66)[])` + args = append(args, pq.Array(streamFilters.StorageFilter.StorageKeys)) + } + return tx.Select(cids.StorageNodes, pgStr, args...) +} + +// ADD IF LEAF ONLY!! diff --git a/pkg/ipfs/retreiver_test.go b/pkg/ipfs/retreiver_test.go new file mode 100644 index 00000000..a0c59fc2 --- /dev/null +++ b/pkg/ipfs/retreiver_test.go @@ -0,0 +1 @@ +package ipfs diff --git a/pkg/ipfs/screener.go b/pkg/ipfs/screener.go new file mode 100644 index 00000000..1dfe5ab1 --- /dev/null +++ b/pkg/ipfs/screener.go @@ -0,0 +1,218 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipfs + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/core/types" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" +) + +// ResponseScreener is the inteface used to screen eth data and package appropriate data into a response payload +type ResponseScreener interface { + ScreenResponse(streamFilters *StreamFilters, payload IPLDPayload) (*ResponsePayload, error) +} + +// Screener is the underlying struct for the ReponseScreener interface +type Screener struct{} + +// NewResponseScreener creates a new Screener satisfyign the ReponseScreener interface +func NewResponseScreener() *Screener { + return &Screener{} +} + +// ScreenResponse is used to filter through eth data to extract and package requested data into a ResponsePayload +func (s *Screener) ScreenResponse(streamFilters *StreamFilters, payload IPLDPayload) (*ResponsePayload, error) { + response := new(ResponsePayload) + err := s.filterHeaders(streamFilters, response, payload) + if err != nil { + return nil, err + } + txHashes, err := s.filterTransactions(streamFilters, response, payload) + if err != nil { + return nil, err + } + err = s.filerReceipts(streamFilters, response, payload, txHashes) + if err != nil { + return nil, err + } + err = s.filterState(streamFilters, response, payload) + if err != nil { + return nil, err + } + err = s.filterStorage(streamFilters, response, payload) + if err != nil { + return nil, err + } + return response, nil +} + +func (s *Screener) filterHeaders(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error { + if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { + response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) + if !streamFilters.HeaderFilter.FinalOnly { + for _, uncle := range payload.BlockBody.Uncles { + uncleRlp, err := rlp.EncodeToBytes(uncle) + if err != nil { + return err + } + response.UnclesRlp = append(response.UnclesRlp, uncleRlp) + } + } + } + return nil +} + +func checkRange(start, end, actual int64) bool { + if (end <= 0 || end >= actual) && start <= actual { + return true + } + return false +} + +func (s *Screener) filterTransactions(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) { + trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions)) + if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { + for i, trx := range payload.BlockBody.Transactions { + if checkTransactions(streamFilters.TrxFilter.Src, streamFilters.TrxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) { + trxBuffer := new(bytes.Buffer) + err := trx.EncodeRLP(trxBuffer) + if err != nil { + return nil, err + } + trxHashes = append(trxHashes, trx.Hash()) + response.TransactionsRlp = append(response.TransactionsRlp, trxBuffer.Bytes()) + } + } + } + return trxHashes, nil +} + +func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst string) bool { + // If we aren't filtering for any addresses, every transaction is a go + if len(wantedDst) == 0 && len(wantedSrc) == 0 { + return true + } + for _, src := range wantedSrc { + if src == actualSrc { + return true + } + } + for _, dst := range wantedDst { + if dst == actualDst { + return true + } + } + return false +} + +func (s *Screener) filerReceipts(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error { + if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { + for i, receipt := range payload.Receipts { + if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) { + receiptBuffer := new(bytes.Buffer) + err := receipt.EncodeRLP(receiptBuffer) + if err != nil { + return err + } + response.ReceiptsRlp = append(response.ReceiptsRlp, receiptBuffer.Bytes()) + } + } + } + return nil +} + +func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics []string, wantedTrxHashes []common.Hash) bool { + // If we aren't filtering for any topics, all topics are a go + if len(wantedTopics) == 0 { + return true + } + for _, wantedTrxHash := range wantedTrxHashes { + if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { + return true + } + } + for _, wantedTopic := range wantedTopics { + for _, actualTopic := range actualTopics { + if wantedTopic == actualTopic { + return true + } + } + } + return false +} + +func (s *Screener) filterState(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error { + response.StateNodesRlp = make(map[common.Hash][]byte) + if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { + keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses)) + for _, addr := range streamFilters.StateFilter.Addresses { + keyFilter := AddressToKey(common.HexToAddress(addr)) + keyFilters = append(keyFilters, keyFilter) + } + for key, stateNode := range payload.StateNodes { + if checkNodeKeys(keyFilters, key) { + if stateNode.Leaf || streamFilters.StateFilter.IntermediateNodes { + response.StateNodesRlp[key] = stateNode.Value + } + } + } + } + return nil +} + +func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { + // If we aren't filtering for any specific keys, all nodes are a go + if len(wantedKeys) == 0 { + return true + } + for _, key := range wantedKeys { + if bytes.Equal(key.Bytes(), actualKey.Bytes()) { + return true + } + } + return false +} + +func (s *Screener) filterStorage(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error { + if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { + stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses)) + for _, addr := range streamFilters.StorageFilter.Addresses { + keyFilter := AddressToKey(common.HexToAddress(addr)) + stateKeyFilters = append(stateKeyFilters, keyFilter) + } + storageKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.StorageKeys)) + for _, store := range streamFilters.StorageFilter.StorageKeys { + keyFilter := HexToKey(store) + storageKeyFilters = append(storageKeyFilters, keyFilter) + } + for stateKey, storageNodes := range payload.StorageNodes { + if checkNodeKeys(stateKeyFilters, stateKey) { + response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte) + for _, storageNode := range storageNodes { + if checkNodeKeys(storageKeyFilters, storageNode.Key) { + response.StorageNodesRlp[stateKey][storageNode.Key] = storageNode.Value + } + } + } + } + } + return nil +} diff --git a/pkg/ipfs/screener_test.go b/pkg/ipfs/screener_test.go new file mode 100644 index 00000000..a0c59fc2 --- /dev/null +++ b/pkg/ipfs/screener_test.go @@ -0,0 +1 @@ +package ipfs diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go index 9e0b2eed..7430a683 100644 --- a/pkg/ipfs/service.go +++ b/pkg/ipfs/service.go @@ -32,22 +32,22 @@ import ( const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size -// SyncAndPublish is an interface for streaming, converting to IPLDs, publishing, and indexing all Ethereum data -// This is the top-level interface used by the syncAndPublish command -type SyncPublishAndServe interface { +// SyncPublishScreenAndServe is an interface for streaming, converting to IPLDs, publishing, +// indexing all Ethereum data screening this data, and serving it up to subscribed clients +type SyncPublishScreenAndServe interface { // APIs(), Protocols(), Start() and Stop() node.Service // Main event loop for syncAndPublish processes SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error // Main event loop for handling client pub-sub - Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) + ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, params *Params) + Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *StreamFilters) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error } -// Processor is the underlying struct for the SyncAndPublish interface +// Service is the underlying struct for the SyncAndPublish interface type Service struct { // Used to sync access to the Subscriptions sync.Mutex @@ -59,6 +59,14 @@ type Service struct { Publisher IPLDPublisher // Interface for indexing the CIDs of the published ETH-IPLDs in Postgres Repository CIDRepository + // Interface for filtering and serving data according to subscribed clients according to their specification + Screener ResponseScreener + // Interface for fetching ETH-IPLD objects from IPFS + Fetcher IPLDFetcher + // Interface for searching and retrieving CIDs from Postgres index + Retriever CIDRetriever + // Interface for resolving ipfs blocks to their data types + Resolver IPLDResolver // Chan the processor uses to subscribe to state diff payloads from the Streamer PayloadChan chan statediff.Payload // Used to signal shutdown of the service @@ -68,18 +76,27 @@ type Service struct { } // NewIPFSProcessor creates a new Processor interface using an underlying Processor struct -func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncPublishAndServe, error) { +func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncPublishScreenAndServe, error) { publisher, err := NewIPLDPublisher(ipfsPath) if err != nil { return nil, err } + fetcher, err := NewIPLDFetcher(ipfsPath) + if err != nil { + return nil, err + } return &Service{ - Streamer: NewStateDiffStreamer(rpcClient), - Repository: NewCIDRepository(db), - Converter: NewPayloadConverter(ethClient), - Publisher: publisher, - PayloadChan: make(chan statediff.Payload, payloadChanBufferSize), - QuitChan: qc, + Streamer: NewStateDiffStreamer(rpcClient), + Repository: NewCIDRepository(db), + Converter: NewPayloadConverter(ethClient), + Publisher: publisher, + Screener: NewResponseScreener(), + Fetcher: fetcher, + Retriever: NewCIDRetriever(db), + Resolver: NewIPLDResolver(), + PayloadChan: make(chan statediff.Payload, payloadChanBufferSize), + QuitChan: qc, + Subscriptions: make(map[rpc.ID]Subscription), }, nil } @@ -121,6 +138,8 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- log.Error(err) continue } + // If we have a ScreenAndServe process running, forward the payload to it + // If the ScreenAndServe process loop is slower than this one, will it miss some incoming payloads?? select { case forwardPayloadChan <- *ipldPayload: default: @@ -137,6 +156,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- case err = <-sub.Err(): log.Error(err) case <-sap.QuitChan: + // If we have a ScreenAndServe process running, forward the quit signal to it select { case forwardQuitchan <- true: default: @@ -151,16 +171,19 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- return nil } -func (sap *Service) Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) { +// ScreenAndServe is the processing loop used to screen data streamed from the state diffing eth node and send the appropriate data to a requesting client subscription +func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) { wg.Add(1) go func() { for { select { case payload := <-receivePayloadChan: - println(payload.BlockNumber.Int64()) - // Method for using subscription parameters to filter payload and stream relevent info to sub channel + err := sap.processResponse(payload) + if err != nil { + log.Error(err) + } case <-receiveQuitchan: - log.Info("quiting Serve process") + log.Info("quiting ScreenAndServe process") wg.Done() return } @@ -168,15 +191,59 @@ func (sap *Service) Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayl }() } -// Subscribe is used by the API to subscribe to the StateDiffingService loop -func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, params *Params) { +func (sap *Service) processResponse(payload IPLDPayload) error { + for id, sub := range sap.Subscriptions { + response, err := sap.Screener.ScreenResponse(sub.StreamFilters, payload) + if err != nil { + return err + } + sap.serve(id, *response) + } + return nil +} + +// Subscribe is used by the API to subscribe to the service loop +func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *StreamFilters) { log.Info("Subscribing to the statediff service") sap.Lock() sap.Subscriptions[id] = Subscription{ - PayloadChan: sub, - QuitChan: quitChan, + PayloadChan: sub, + QuitChan: quitChan, + StreamFilters: streamFilters, } sap.Unlock() + // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data + // Otherwise we only filter new data as it is streamed in from the state diffing geth node + if streamFilters.BackFill { + // Retrieve cached CIDs relevant to this subscriber + cids, err := sap.Retriever.RetrieveCIDs(*streamFilters) + if err != nil { + log.Error(err) + sap.serve(id, ResponsePayload{ + Err: err, + }) + return + } + for _, cid := range cids { + blocksWrapper, err := sap.Fetcher.FetchCIDs(cid) + if err != nil { + log.Error(err) + sap.serve(id, ResponsePayload{ + Err: err, + }) + return + } + backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper) + if err != nil { + log.Error(err) + sap.serve(id, ResponsePayload{ + Err: err, + }) + return + } + sap.serve(id, *backFillIplds) + } + } } // Unsubscribe is used to unsubscribe to the StateDiffingService loop @@ -192,28 +259,29 @@ func (sap *Service) Unsubscribe(id rpc.ID) error { return nil } -// Start is used to begin the StateDiffingService +// Start is used to begin the service func (sap *Service) Start(*p2p.Server) error { log.Info("Starting statediff service") wg := new(sync.WaitGroup) payloadChan := make(chan IPLDPayload) quitChan := make(chan bool) - go sap.SyncAndPublish(wg, payloadChan, quitChan) - go sap.Serve(wg, payloadChan, quitChan) + sap.SyncAndPublish(wg, payloadChan, quitChan) + sap.ScreenAndServe(wg, payloadChan, quitChan) return nil } -// Stop is used to close down the StateDiffingService +// Stop is used to close down the service func (sap *Service) Stop() error { log.Info("Stopping statediff service") close(sap.QuitChan) return nil } -// send is used to fan out and serve a payload to any subscriptions -func (sap *Service) send(payload ResponsePayload) { +// serve is used to send screened payloads to their requesting sub +func (sap *Service) serve(id rpc.ID, payload ResponsePayload) { sap.Lock() - for id, sub := range sap.Subscriptions { + sub, ok := sap.Subscriptions[id] + if ok { select { case sub.PayloadChan <- payload: log.Infof("sending state diff payload to subscription %s", id) diff --git a/pkg/ipfs/test_helpers/mocks/api.go b/pkg/ipfs/test_helpers/mocks/api.go new file mode 100644 index 00000000..f726b26e --- /dev/null +++ b/pkg/ipfs/test_helpers/mocks/api.go @@ -0,0 +1 @@ +package mocks diff --git a/pkg/ipfs/test_helpers/mocks/screener.go b/pkg/ipfs/test_helpers/mocks/screener.go new file mode 100644 index 00000000..f726b26e --- /dev/null +++ b/pkg/ipfs/test_helpers/mocks/screener.go @@ -0,0 +1 @@ +package mocks diff --git a/pkg/ipfs/test_helpers/test_data.go b/pkg/ipfs/test_helpers/test_data.go index 83fc7f94..8ce24d8e 100644 --- a/pkg/ipfs/test_helpers/test_data.go +++ b/pkg/ipfs/test_helpers/test_data.go @@ -24,18 +24,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) -// AddressToLeafKey hashes an returns an address -func AddressToLeafKey(address common.Address) common.Hash { - return common.BytesToHash(crypto.Keccak256(address[:])) -} - // Test variables var ( BlockNumber = big.NewInt(rand.Int63()) @@ -55,9 +49,9 @@ var ( }} emptyStorage = make([]statediff.StorageDiff, 0) address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") - ContractLeafKey = AddressToLeafKey(address) + ContractLeafKey = ipfs.AddressToKey(address) anotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") - AnotherContractLeafKey = AddressToLeafKey(anotherAddress) + AnotherContractLeafKey = ipfs.AddressToKey(anotherAddress) testAccount = state.Account{ Nonce: NewNonceValue, Balance: big.NewInt(NewBalanceValue), @@ -143,14 +137,14 @@ var ( HeaderCID: "mockHeaderCID", TransactionCIDs: map[common.Hash]*ipfs.TrxMetaData{ common.HexToHash("0x0"): { - CID: "mockTrxCID1", - To: "mockTo1", - From: "mockFrom1", + CID: "mockTrxCID1", + Dst: "mockTo1", + Src: "mockFrom1", }, common.HexToHash("0x1"): { - CID: "mockTrxCID2", - To: "mockTo2", - From: "mockFrom2", + CID: "mockTrxCID2", + Dst: "mockTo2", + Src: "mockFrom2", }, }, ReceiptCIDs: map[common.Hash]*ipfs.ReceiptMetaData{ diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go index 228cf85c..36cafb02 100644 --- a/pkg/ipfs/types.go +++ b/pkg/ipfs/types.go @@ -20,23 +20,28 @@ import ( "encoding/json" "math/big" + "github.com/ipfs/go-block-format" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) -// +// Subscription holds the information for an individual client subscription type Subscription struct { - PayloadChan chan<- ResponsePayload - QuitChan chan<- bool + PayloadChan chan<- ResponsePayload + QuitChan chan<- bool + StreamFilters *StreamFilters } +// ResponsePayload holds the data returned from the seed node to the requesting client type ResponsePayload struct { - HeadersRlp [][]byte `json:"headersRlp"` - UnclesRlp [][]byte `json:"unclesRlp"` - TransactionsRlp [][]byte `json:"transactionsRlp"` - ReceiptsRlp [][]byte `json:"receiptsRlp"` - StateNodesRlp [][]byte `json:"stateNodesRlp"` - StorageNodesRlp [][]byte `json:"storageNodesRlp"` + HeadersRlp [][]byte `json:"headersRlp"` + UnclesRlp [][]byte `json:"unclesRlp"` + TransactionsRlp [][]byte `json:"transactionsRlp"` + ReceiptsRlp [][]byte `json:"receiptsRlp"` + StateNodesRlp map[common.Hash][]byte `json:"stateNodesRlp"` + StorageNodesRlp map[common.Hash]map[common.Hash][]byte `json:"storageNodesRlp"` + Err error `json:"error"` encoded []byte err error @@ -60,6 +65,23 @@ func (sd *ResponsePayload) Encode() ([]byte, error) { return sd.encoded, sd.err } +type cidWrapper struct { + BlockNumber int64 + Headers []string + Transactions []string + Receipts []string + StateNodes []StateNodeCID + StorageNodes []StorageNodeCID +} + +type ipfsBlockWrapper struct { + Headers []blocks.Block + Transactions []blocks.Block + Receipts []blocks.Block + StateNodes map[common.Hash]blocks.Block + StorageNodes map[common.Hash]map[common.Hash]blocks.Block +} + // IPLDPayload is a custom type which packages ETH data for the IPFS publisher type IPLDPayload struct { HeaderRLP []byte @@ -73,11 +95,13 @@ type IPLDPayload struct { StorageNodes map[common.Hash][]StorageNode } +// StateNode struct used to flag node as leaf or not type StateNode struct { Value []byte Leaf bool } +// StorageNode struct used to flag node as leaf or not type StorageNode struct { Key common.Hash Value []byte @@ -96,15 +120,19 @@ type CIDPayload struct { StorageNodeCIDs map[common.Hash][]StorageNodeCID } +// StateNodeCID is used to associate a leaf flag with a state node cid type StateNodeCID struct { CID string Leaf bool + Key string `db:"state_key"` } +// StorageNodeCID is used to associate a leaf flag with a storage node cid type StorageNodeCID struct { - Key common.Hash - CID string - Leaf bool + Key string `db:"storage_key"` + CID string + Leaf bool + StateKey string `db:"state_key"` } // ReceiptMetaData wraps some additional data around our receipt CIDs for indexing @@ -115,45 +143,39 @@ type ReceiptMetaData struct { // TrxMetaData wraps some additional data around our transaction CID for indexing type TrxMetaData struct { - CID string - To string - From string + CID string + Src string + Dst string } -// Params are set by the client to tell the server how to filter that is fed into their subscription -type Params struct { - HeaderFilter struct { - Off bool - StartingBlock int64 - EndingBlock int64 // set to 0 or a negative value to have no ending block - Uncles bool +// StreamFilters are defined by the client to specifiy which data to receive from the seed node +type StreamFilters struct { + BackFill bool + BackFillOnly bool + StartingBlock int64 + EndingBlock int64 // set to 0 or a negative value to have no ending block + HeaderFilter struct { + Off bool + FinalOnly bool } TrxFilter struct { - Off bool - StartingBlock int64 - EndingBlock int64 - Src string - Dst string + Off bool + Src []string + Dst []string } ReceiptFilter struct { - Off bool - StartingBlock int64 - EndingBlock int64 - Topic0s []string + Off bool + Topic0s []string } StateFilter struct { - Off bool - StartingBlock int64 - EndingBlock int64 - Address string // is converted to state key by taking its keccak256 hash - LeafsOnly bool + Off bool + Addresses []string // is converted to state key by taking its keccak256 hash + IntermediateNodes bool } StorageFilter struct { - Off bool - StartingBlock int64 - EndingBlock int64 - Address string - StorageKey string - LeafsOnly bool + Off bool + Addresses []string + StorageKeys []string + IntermediateNodes bool } }