diff --git a/db/migrations/00028_create_header_cids_table.sql b/db/migrations/00028_create_header_cids_table.sql
index 8d63f299..331867c1 100644
--- a/db/migrations/00028_create_header_cids_table.sql
+++ b/db/migrations/00028_create_header_cids_table.sql
@@ -4,7 +4,7 @@ CREATE TABLE public.header_cids (
block_number BIGINT NOT NULL,
block_hash VARCHAR(66) NOT NULL,
cid TEXT NOT NULL,
- uncle BOOLEAN NOT NULL,
+ final BOOLEAN NOT NULL,
UNIQUE (block_number, block_hash)
);
diff --git a/pkg/ipfs/api.go b/pkg/ipfs/api.go
index 15413678..e32203b5 100644
--- a/pkg/ipfs/api.go
+++ b/pkg/ipfs/api.go
@@ -18,7 +18,6 @@ package ipfs
import (
"context"
-
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
@@ -29,26 +28,36 @@ const APIName = "vulcanizedb"
// APIVersion is the version of the state diffing service API
const APIVersion = "0.0.1"
-// PublicSeedNodeAPI
+// PublicSeedNodeAPI is the public api for the seed node
type PublicSeedNodeAPI struct {
- snp SyncPublishAndServe
+ snp SyncPublishScreenAndServe
}
-// NewPublicSeedNodeAPI
-func NewPublicSeedNodeAPI(snp SyncPublishAndServe) *PublicSeedNodeAPI {
+// NewPublicSeedNodeAPI creates a new PublicSeedNodeAPI with the provided underlying SyncPublishScreenAndServe process
+func NewPublicSeedNodeAPI(snp SyncPublishScreenAndServe) *PublicSeedNodeAPI {
return &PublicSeedNodeAPI{
snp: snp,
}
}
// Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created
-func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChan chan ResponsePayload, params *Params) (*rpc.Subscription, error) {
+func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChanForTypeDefOnly chan ResponsePayload) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
+ streamFilters := StreamFilters{}
+ streamFilters.HeaderFilter.FinalOnly = true
+ streamFilters.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
+ streamFilters.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
+ streamFilters.ReceiptFilter.Topic0s = []string{
+ "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
+ "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377",
+ }
+ streamFilters.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
+ streamFilters.StorageFilter.Off = true
// create subscription and start waiting for statediff events
rpcSub := notifier.CreateSubscription()
@@ -56,7 +65,7 @@ func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChan chan Re
// subscribe to events from the state diff service
payloadChannel := make(chan ResponsePayload)
quitChan := make(chan bool)
- api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, params)
+ go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, &streamFilters)
// loop and await state diff payloads and relay them to the subscriber with then notifier
for {
@@ -79,4 +88,4 @@ func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChan chan Re
}()
return rpcSub, nil
-}
+}
\ No newline at end of file
diff --git a/pkg/ipfs/api_test.go b/pkg/ipfs/api_test.go
new file mode 100644
index 00000000..a0c59fc2
--- /dev/null
+++ b/pkg/ipfs/api_test.go
@@ -0,0 +1 @@
+package ipfs
diff --git a/pkg/ipfs/converter.go b/pkg/ipfs/converter.go
index fe349ca0..82609795 100644
--- a/pkg/ipfs/converter.go
+++ b/pkg/ipfs/converter.go
@@ -73,8 +73,8 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) {
return nil, err
}
txMeta := &TrxMetaData{
- To: handleNullAddr(trx.To()),
- From: from.Hex(),
+ Dst: handleNullAddr(trx.To()),
+ Src: from.Hex(),
}
// txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody
convertedPayload.TrxMetaData = append(convertedPayload.TrxMetaData, txMeta)
diff --git a/pkg/ipfs/fetcher.go b/pkg/ipfs/fetcher.go
index bdadfb57..6b6a9411 100644
--- a/pkg/ipfs/fetcher.go
+++ b/pkg/ipfs/fetcher.go
@@ -19,36 +19,174 @@ package ipfs
import (
"context"
+ "github.com/ethereum/go-ethereum/common"
+
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
+ log "github.com/sirupsen/logrus"
)
-// IPLDFetcher is the underlying struct which supports a IPLD fetching interface
-type IPLDFetcher struct {
+// IPLDFethcer is an interface for fetching IPLDs
+type IPLDFetcher interface {
+ FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error)
+}
+
+// EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS
+type EthIPLDFetcher struct {
BlockService blockservice.BlockService
}
// NewIPLDFetcher creates a pointer to a new IPLDFetcher
-func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) {
+func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) {
blockService, err := InitIPFSBlockService(ipfsPath)
if err != nil {
return nil, err
}
- return &IPLDFetcher{
+ return &EthIPLDFetcher{
BlockService: blockService,
}, nil
}
-// Fetch is used to fetch a single block of IPFS data by cid
-func (f *IPLDFetcher) Fetch(cid cid.Cid) (blocks.Block, error) {
+// FetchCIDs is the exported method for fetching and returning all the cids passed in a cidWrapper
+func (f *EthIPLDFetcher) FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error) {
+ blocks := &ipfsBlockWrapper{
+ Headers: make([]blocks.Block, 0),
+ Transactions: make([]blocks.Block, 0),
+ Receipts: make([]blocks.Block, 0),
+ StateNodes: make(map[common.Hash]blocks.Block),
+ StorageNodes: make(map[common.Hash]map[common.Hash]blocks.Block),
+ }
+
+ err := f.fetchHeaders(cids, blocks)
+ if err != nil {
+ return nil, err
+ }
+ err = f.fetchTrxs(cids, blocks)
+ if err != nil {
+ return nil, err
+ }
+ err = f.fetchRcts(cids, blocks)
+ if err != nil {
+ return nil, err
+ }
+ err = f.fetchStorage(cids, blocks)
+ if err != nil {
+ return nil, err
+ }
+ err = f.fetchState(cids, blocks)
+ if err != nil {
+ return nil, err
+ }
+
+ return blocks, nil
+}
+
+// fetchHeaders fetches headers
+// It uses the f.fetchBatch method
+func (f *EthIPLDFetcher) fetchHeaders(cids cidWrapper, blocks *ipfsBlockWrapper) error {
+ headerCids := make([]cid.Cid, 0, len(cids.Headers))
+ for _, c := range cids.Headers {
+ dc, err := cid.Decode(c)
+ if err != nil {
+ return err
+ }
+ headerCids = append(headerCids, dc)
+ }
+ blocks.Headers = f.fetchBatch(headerCids)
+ if len(blocks.Headers) != len(headerCids) {
+ log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(blocks.Headers), len(headerCids))
+ }
+ return nil
+}
+
+// fetchTrxs fetches transactions
+// It uses the f.fetchBatch method
+func (f *EthIPLDFetcher) fetchTrxs(cids cidWrapper, blocks *ipfsBlockWrapper) error {
+ trxCids := make([]cid.Cid, 0, len(cids.Transactions))
+ for _, c := range cids.Transactions {
+ dc, err := cid.Decode(c)
+ if err != nil {
+ return err
+ }
+ trxCids = append(trxCids, dc)
+ }
+ blocks.Transactions = f.fetchBatch(trxCids)
+ if len(blocks.Transactions) != len(trxCids) {
+ log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(blocks.Transactions), len(trxCids))
+ }
+ return nil
+}
+
+// fetchRcts fetches receipts
+// It uses the f.fetchBatch method
+func (f *EthIPLDFetcher) fetchRcts(cids cidWrapper, blocks *ipfsBlockWrapper) error {
+ rctCids := make([]cid.Cid, 0, len(cids.Receipts))
+ for _, c := range cids.Receipts {
+ dc, err := cid.Decode(c)
+ if err != nil {
+ return err
+ }
+ rctCids = append(rctCids, dc)
+ }
+ blocks.Receipts = f.fetchBatch(rctCids)
+ if len(blocks.Receipts) != len(rctCids) {
+ log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(blocks.Receipts), len(rctCids))
+ }
+ return nil
+}
+
+// fetchState fetches state nodes
+// It uses the single f.fetch method instead of the batch fetch, because it
+// needs to maintain the data's relation to state keys
+func (f *EthIPLDFetcher) fetchState(cids cidWrapper, blocks *ipfsBlockWrapper) error {
+ for _, stateNode := range cids.StateNodes {
+ if stateNode.CID == "" || stateNode.Key == "" {
+ continue
+ }
+ dc, err := cid.Decode(stateNode.CID)
+ if err != nil {
+ return err
+ }
+ block, err := f.fetch(dc)
+ if err != nil {
+ return err
+ }
+ blocks.StateNodes[common.HexToHash(stateNode.Key)] = block
+ }
+ return nil
+}
+
+// fetchStorage fetches storage nodes
+// It uses the single f.fetch method instead of the batch fetch, because it
+// needs to maintain the data's relation to state and storage keys
+func (f *EthIPLDFetcher) fetchStorage(cids cidWrapper, blocks *ipfsBlockWrapper) error {
+ for _, storageNode := range cids.StorageNodes {
+ if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" {
+ continue
+ }
+ dc, err := cid.Decode(storageNode.CID)
+ if err != nil {
+ return err
+ }
+ block, err := f.fetch(dc)
+ if err != nil {
+ return err
+ }
+ blocks.StorageNodes[common.HexToHash(storageNode.StateKey)][common.HexToHash(storageNode.Key)] = block
+ }
+ return nil
+}
+
+// fetch is used to fetch a single cid
+func (f *EthIPLDFetcher) fetch(cid cid.Cid) (blocks.Block, error) {
return f.BlockService.GetBlock(context.Background(), cid)
}
-// FetchBatch is used to fetch a batch of IPFS data blocks by cid
+// fetchBatch is used to fetch a batch of IPFS data blocks by cid
// There is no guarantee all are fetched, and no error in such a case, so
// downstream we will need to confirm which CIDs were fetched in the result set
-func (f *IPLDFetcher) FetchBatch(cids []cid.Cid) []blocks.Block {
+func (f *EthIPLDFetcher) fetchBatch(cids []cid.Cid) []blocks.Block {
fetchedBlocks := make([]blocks.Block, 0, len(cids))
blockChan := f.BlockService.GetBlocks(context.Background(), cids)
for block := range blockChan {
diff --git a/pkg/ipfs/helpers.go b/pkg/ipfs/helpers.go
index de3c8c90..c2b8e15c 100644
--- a/pkg/ipfs/helpers.go
+++ b/pkg/ipfs/helpers.go
@@ -19,6 +19,8 @@ package ipfs
import (
"context"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/repo/fsrepo"
@@ -41,3 +43,14 @@ func InitIPFSBlockService(ipfsPath string) (blockservice.BlockService, error) {
}
return ipfsNode.Blocks, nil
}
+
+// AddressToKey hashes an address
+func AddressToKey(address common.Address) common.Hash {
+ return crypto.Keccak256Hash(address[:])
+}
+
+// HexToKey hashes a hex (0x leading) string
+func HexToKey(hex string) common.Hash {
+ addr := common.HexToAddress(hex)
+ return crypto.Keccak256Hash(addr[:])
+}
diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go
index 7f9b4d99..f2490040 100644
--- a/pkg/ipfs/publisher.go
+++ b/pkg/ipfs/publisher.go
@@ -209,7 +209,7 @@ func (pub *Publisher) publishStorageNodes(storageNodes map[common.Hash][]Storage
return nil, errors.New("single CID expected to be returned for storage leaf")
}
storageLeafCids[addr] = append(storageLeafCids[addr], StorageNodeCID{
- Key: node.Key,
+ Key: node.Key.Hex(),
CID: storageNodeCid[0],
Leaf: node.Leaf,
})
diff --git a/pkg/ipfs/repository.go b/pkg/ipfs/repository.go
index 1729ffa2..474e2921 100644
--- a/pkg/ipfs/repository.go
+++ b/pkg/ipfs/repository.go
@@ -14,7 +14,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-// Still seeing some errors from tx and storage indexing processes... due to fk constraints being broken
package ipfs
import (
@@ -24,6 +23,8 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
+// Still seeing some errors from tx and storage indexing processes... due to fk constraints being broken
+
// CIDRepository is an interface for indexing CIDPayloads
type CIDRepository interface {
Index(cidPayload *CIDPayload) error
@@ -70,17 +71,17 @@ func (repo *Repository) Index(cidPayload *CIDPayload) error {
func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) {
var headerID int64
- err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4)
- ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4)
+ err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4)
+ ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, final) = ($3, $4)
RETURNING id`,
- blockNumber, hash, cid, false).Scan(&headerID)
+ blockNumber, hash, cid, true).Scan(&headerID)
return headerID, err
}
func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string) error {
- _, err := tx.Queryx(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4)
- ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4)`,
- blockNumber, hash, cid, true)
+ _, err := tx.Queryx(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4)
+ ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, final) = ($3, $4)`,
+ blockNumber, hash, cid, false)
return err
}
@@ -91,7 +92,7 @@ func (repo *Repository) indexTransactionAndReceiptCIDs(payload *CIDPayload, head
err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src) = ($3, $4, $5)
RETURNING id`,
- headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.To, trxCidMeta.From).Scan(&txID)
+ headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src).Scan(&txID)
if err != nil {
tx.Rollback()
return err
@@ -140,6 +141,6 @@ func (repo *Repository) indexStateAndStorageCIDs(payload *CIDPayload, headerID i
func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID, stateID int64) error {
_, err := repo.db.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid, leaf) VALUES ($1, $2, $3, $4)
ON CONFLICT (state_id, storage_key) DO UPDATE SET (cid, leaf) = ($3, $4)`,
- stateID, storageCID.Key.Hex(), storageCID.CID, storageCID.Leaf)
+ stateID, storageCID.Key, storageCID.CID, storageCID.Leaf)
return err
}
diff --git a/pkg/ipfs/resolver.go b/pkg/ipfs/resolver.go
new file mode 100644
index 00000000..e0352b15
--- /dev/null
+++ b/pkg/ipfs/resolver.go
@@ -0,0 +1,86 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package ipfs
+
+import (
+ "github.com/ethereum/go-ethereum/common"
+
+ "github.com/ipfs/go-block-format"
+)
+
+type IPLDResolver interface {
+ ResolveIPLDs(ipfsBlocks ipfsBlockWrapper) (*ResponsePayload, error)
+}
+
+type EthIPLDResolver struct{}
+
+func NewIPLDResolver() *EthIPLDResolver {
+ return &EthIPLDResolver{}
+}
+
+func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks ipfsBlockWrapper) (*ResponsePayload, error) {
+ response := new(ResponsePayload)
+ eir.resolveHeaders(ipfsBlocks.Headers, response)
+ eir.resolveTransactions(ipfsBlocks.Transactions, response)
+ eir.resolveReceipts(ipfsBlocks.Receipts, response)
+ eir.resolveState(ipfsBlocks.StateNodes, response)
+ eir.resolveStorage(ipfsBlocks.StorageNodes, response)
+ return response, nil
+}
+
+func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *ResponsePayload) {
+ for _, block := range blocks {
+ raw := block.RawData()
+ response.HeadersRlp = append(response.HeadersRlp, raw)
+ }
+}
+
+func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *ResponsePayload) {
+ for _, block := range blocks {
+ raw := block.RawData()
+ response.TransactionsRlp = append(response.TransactionsRlp, raw)
+ }
+}
+
+func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *ResponsePayload) {
+ for _, block := range blocks {
+ raw := block.RawData()
+ response.ReceiptsRlp = append(response.ReceiptsRlp, raw)
+ }
+}
+
+func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *ResponsePayload) {
+ if response.StateNodesRlp == nil {
+ response.StateNodesRlp = make(map[common.Hash][]byte)
+ }
+ for key, block := range blocks {
+ raw := block.RawData()
+ response.StateNodesRlp[key] = raw
+ }
+}
+
+func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *ResponsePayload) {
+ if response.StateNodesRlp == nil {
+ response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte)
+ }
+ for stateKey, storageBlocks := range blocks {
+ for storageKey, storageVal := range storageBlocks {
+ raw := storageVal.RawData()
+ response.StorageNodesRlp[stateKey][storageKey] = raw
+ }
+ }
+}
diff --git a/pkg/ipfs/retreiver.go b/pkg/ipfs/retreiver.go
index 63fc346e..02f805b9 100644
--- a/pkg/ipfs/retreiver.go
+++ b/pkg/ipfs/retreiver.go
@@ -15,3 +15,198 @@
// along with this program. If not, see .
package ipfs
+
+import (
+ "github.com/jmoiron/sqlx"
+ "github.com/lib/pq"
+ "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
+)
+
+type CIDRetriever interface {
+ RetrieveCIDs(streamFilters StreamFilters) ([]cidWrapper, error)
+}
+
+type EthCIDRetriever struct {
+ db *postgres.DB
+}
+
+func NewCIDRetriever(db *postgres.DB) *EthCIDRetriever {
+ return &EthCIDRetriever{
+ db: db,
+ }
+}
+
+func (ecr *EthCIDRetriever) GetLastBlockNumber() (int64, error) {
+ var blockNumber int64
+ err := ecr.db.Get(&blockNumber, "SELECT block_number FROM header_cids ORDER BY block_number DESC LIMIT 1 ")
+ return blockNumber, err
+}
+func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]cidWrapper, error) {
+ var endingBlock int64
+ var err error
+ if streamFilters.EndingBlock <= 0 || streamFilters.EndingBlock <= streamFilters.StartingBlock {
+ endingBlock, err = ecr.GetLastBlockNumber()
+ if err != nil {
+ return nil, err
+ }
+ }
+ cids := make([]cidWrapper, 0, endingBlock+1-streamFilters.StartingBlock)
+ tx, err := ecr.db.Beginx()
+ if err != nil {
+ return nil, err
+ }
+ for i := streamFilters.StartingBlock; i <= endingBlock; i++ {
+ cw := &cidWrapper{
+ BlockNumber: i,
+ Headers: make([]string, 0),
+ Transactions: make([]string, 0),
+ Receipts: make([]string, 0),
+ StateNodes: make([]StateNodeCID, 0),
+ StorageNodes: make([]StorageNodeCID, 0),
+ }
+ if !streamFilters.HeaderFilter.Off {
+ err = ecr.retrieveHeaderCIDs(tx, streamFilters, cw, i)
+ if err != nil {
+ tx.Rollback()
+ return nil, err
+ }
+ }
+ var trxIds []int64
+ if !streamFilters.TrxFilter.Off {
+ trxIds, err = ecr.retrieveTrxCIDs(tx, streamFilters, cw, i)
+ if err != nil {
+ tx.Rollback()
+ return nil, err
+ }
+ }
+ if !streamFilters.ReceiptFilter.Off {
+ err = ecr.retrieveRctCIDs(tx, streamFilters, cw, i, trxIds)
+ if err != nil {
+ tx.Rollback()
+ return nil, err
+ }
+ }
+ if !streamFilters.StateFilter.Off {
+ err = ecr.retrieveStateCIDs(tx, streamFilters, cw, i)
+ if err != nil {
+ tx.Rollback()
+ return nil, err
+ }
+ }
+ if !streamFilters.StorageFilter.Off {
+ err = ecr.retrieveStorageCIDs(tx, streamFilters, cw, i)
+ if err != nil {
+ tx.Rollback()
+ return nil, err
+ }
+ }
+ cids = append(cids, *cw)
+ }
+
+ return cids, err
+}
+
+func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error {
+ var pgStr string
+ if streamFilters.HeaderFilter.FinalOnly {
+ pgStr = `SELECT cid FROM header_cids
+ WHERE block_number = $1
+ AND final IS TRUE`
+ } else {
+ pgStr = `SELECT cid FROM header_cids
+ WHERE block_number = $1`
+ }
+ return tx.Select(cids.Headers, pgStr, blockNumber)
+}
+
+func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) ([]int64, error) {
+ args := make([]interface{}, 0, 3)
+ type result struct {
+ Id int64 `db:"id"`
+ Cid string `db:"cid"`
+ }
+ results := make([]result, 0)
+ pgStr := `SELECT transaction_cids.id, transaction_cids.cid FROM transaction_cids INNER JOIN header_cids ON (transaction_cids.header_id = header_cids.id)
+ WHERE header_cids.block_number = $1`
+ args = append(args, blockNumber)
+ if len(streamFilters.TrxFilter.Dst) > 0 {
+ pgStr += ` AND transaction_cids.dst = ANY($2::VARCHAR(66)[])`
+ args = append(args, pq.Array(streamFilters.TrxFilter.Dst))
+ }
+ if len(streamFilters.TrxFilter.Src) > 0 {
+ pgStr += ` AND transaction_cids.src = ANY($3::VARCHAR(66)[])`
+ args = append(args, pq.Array(streamFilters.TrxFilter.Src))
+ }
+ err := tx.Select(results, pgStr, args...)
+ if err != nil {
+ return nil, err
+ }
+ ids := make([]int64, 0)
+ for _, res := range results {
+ cids.Transactions = append(cids.Transactions, res.Cid)
+ ids = append(ids, res.Id)
+ }
+ return ids, nil
+}
+
+func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64, trxIds []int64) error {
+ args := make([]interface{}, 0, 2)
+ pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids
+ WHERE receipt_cids.tx_id = transaction_cids.id
+ AND transaction_cids.header_id = header_cids.id
+ AND header_cids.block_number = $1`
+ args = append(args, blockNumber)
+ if len(streamFilters.ReceiptFilter.Topic0s) > 0 {
+ pgStr += ` AND (receipt_cids.topic0s && $2::VARCHAR(66)[]`
+ args = append(args, pq.Array(streamFilters.ReceiptFilter.Topic0s))
+ }
+ if len(trxIds) > 0 {
+ pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))`
+ args = append(args, pq.Array(trxIds))
+ } else {
+ pgStr += `)`
+ }
+ return tx.Select(cids.Receipts, pgStr, args...)
+}
+
+func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error {
+ args := make([]interface{}, 0, 2)
+ pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id)
+ WHERE header_cids.block_number = $1`
+ args = append(args, blockNumber)
+ addrLen := len(streamFilters.StateFilter.Addresses)
+ if addrLen > 0 {
+ keys := make([]string, 0, addrLen)
+ for _, addr := range streamFilters.StateFilter.Addresses {
+ keys = append(keys, HexToKey(addr).Hex())
+ }
+ pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])`
+ args = append(args, pq.Array(keys))
+ }
+ return tx.Select(cids.StateNodes, pgStr, args...)
+}
+
+func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error {
+ args := make([]interface{}, 0, 3)
+ pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids
+ WHERE storage_cids.state_id = state_cids.id
+ AND state_cids.header_id = header_cids.id
+ AND header_cids.block_number = $1`
+ args = append(args, blockNumber)
+ addrLen := len(streamFilters.StorageFilter.Addresses)
+ if addrLen > 0 {
+ keys := make([]string, 0, addrLen)
+ for _, addr := range streamFilters.StorageFilter.Addresses {
+ keys = append(keys, HexToKey(addr).Hex())
+ }
+ pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])`
+ args = append(args, pq.Array(keys))
+ }
+ if len(streamFilters.StorageFilter.StorageKeys) > 0 {
+ pgStr += ` AND storage_cids.storage_key = ANY($3::VARCHAR(66)[])`
+ args = append(args, pq.Array(streamFilters.StorageFilter.StorageKeys))
+ }
+ return tx.Select(cids.StorageNodes, pgStr, args...)
+}
+
+// ADD IF LEAF ONLY!!
diff --git a/pkg/ipfs/retreiver_test.go b/pkg/ipfs/retreiver_test.go
new file mode 100644
index 00000000..a0c59fc2
--- /dev/null
+++ b/pkg/ipfs/retreiver_test.go
@@ -0,0 +1 @@
+package ipfs
diff --git a/pkg/ipfs/screener.go b/pkg/ipfs/screener.go
new file mode 100644
index 00000000..1dfe5ab1
--- /dev/null
+++ b/pkg/ipfs/screener.go
@@ -0,0 +1,218 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package ipfs
+
+import (
+ "bytes"
+
+ "github.com/ethereum/go-ethereum/core/types"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// ResponseScreener is the inteface used to screen eth data and package appropriate data into a response payload
+type ResponseScreener interface {
+ ScreenResponse(streamFilters *StreamFilters, payload IPLDPayload) (*ResponsePayload, error)
+}
+
+// Screener is the underlying struct for the ReponseScreener interface
+type Screener struct{}
+
+// NewResponseScreener creates a new Screener satisfyign the ReponseScreener interface
+func NewResponseScreener() *Screener {
+ return &Screener{}
+}
+
+// ScreenResponse is used to filter through eth data to extract and package requested data into a ResponsePayload
+func (s *Screener) ScreenResponse(streamFilters *StreamFilters, payload IPLDPayload) (*ResponsePayload, error) {
+ response := new(ResponsePayload)
+ err := s.filterHeaders(streamFilters, response, payload)
+ if err != nil {
+ return nil, err
+ }
+ txHashes, err := s.filterTransactions(streamFilters, response, payload)
+ if err != nil {
+ return nil, err
+ }
+ err = s.filerReceipts(streamFilters, response, payload, txHashes)
+ if err != nil {
+ return nil, err
+ }
+ err = s.filterState(streamFilters, response, payload)
+ if err != nil {
+ return nil, err
+ }
+ err = s.filterStorage(streamFilters, response, payload)
+ if err != nil {
+ return nil, err
+ }
+ return response, nil
+}
+
+func (s *Screener) filterHeaders(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error {
+ if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
+ response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP)
+ if !streamFilters.HeaderFilter.FinalOnly {
+ for _, uncle := range payload.BlockBody.Uncles {
+ uncleRlp, err := rlp.EncodeToBytes(uncle)
+ if err != nil {
+ return err
+ }
+ response.UnclesRlp = append(response.UnclesRlp, uncleRlp)
+ }
+ }
+ }
+ return nil
+}
+
+func checkRange(start, end, actual int64) bool {
+ if (end <= 0 || end >= actual) && start <= actual {
+ return true
+ }
+ return false
+}
+
+func (s *Screener) filterTransactions(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) {
+ trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions))
+ if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
+ for i, trx := range payload.BlockBody.Transactions {
+ if checkTransactions(streamFilters.TrxFilter.Src, streamFilters.TrxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) {
+ trxBuffer := new(bytes.Buffer)
+ err := trx.EncodeRLP(trxBuffer)
+ if err != nil {
+ return nil, err
+ }
+ trxHashes = append(trxHashes, trx.Hash())
+ response.TransactionsRlp = append(response.TransactionsRlp, trxBuffer.Bytes())
+ }
+ }
+ }
+ return trxHashes, nil
+}
+
+func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst string) bool {
+ // If we aren't filtering for any addresses, every transaction is a go
+ if len(wantedDst) == 0 && len(wantedSrc) == 0 {
+ return true
+ }
+ for _, src := range wantedSrc {
+ if src == actualSrc {
+ return true
+ }
+ }
+ for _, dst := range wantedDst {
+ if dst == actualDst {
+ return true
+ }
+ }
+ return false
+}
+
+func (s *Screener) filerReceipts(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error {
+ if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
+ for i, receipt := range payload.Receipts {
+ if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) {
+ receiptBuffer := new(bytes.Buffer)
+ err := receipt.EncodeRLP(receiptBuffer)
+ if err != nil {
+ return err
+ }
+ response.ReceiptsRlp = append(response.ReceiptsRlp, receiptBuffer.Bytes())
+ }
+ }
+ }
+ return nil
+}
+
+func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics []string, wantedTrxHashes []common.Hash) bool {
+ // If we aren't filtering for any topics, all topics are a go
+ if len(wantedTopics) == 0 {
+ return true
+ }
+ for _, wantedTrxHash := range wantedTrxHashes {
+ if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) {
+ return true
+ }
+ }
+ for _, wantedTopic := range wantedTopics {
+ for _, actualTopic := range actualTopics {
+ if wantedTopic == actualTopic {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func (s *Screener) filterState(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error {
+ response.StateNodesRlp = make(map[common.Hash][]byte)
+ if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
+ keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses))
+ for _, addr := range streamFilters.StateFilter.Addresses {
+ keyFilter := AddressToKey(common.HexToAddress(addr))
+ keyFilters = append(keyFilters, keyFilter)
+ }
+ for key, stateNode := range payload.StateNodes {
+ if checkNodeKeys(keyFilters, key) {
+ if stateNode.Leaf || streamFilters.StateFilter.IntermediateNodes {
+ response.StateNodesRlp[key] = stateNode.Value
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
+ // If we aren't filtering for any specific keys, all nodes are a go
+ if len(wantedKeys) == 0 {
+ return true
+ }
+ for _, key := range wantedKeys {
+ if bytes.Equal(key.Bytes(), actualKey.Bytes()) {
+ return true
+ }
+ }
+ return false
+}
+
+func (s *Screener) filterStorage(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error {
+ if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
+ stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses))
+ for _, addr := range streamFilters.StorageFilter.Addresses {
+ keyFilter := AddressToKey(common.HexToAddress(addr))
+ stateKeyFilters = append(stateKeyFilters, keyFilter)
+ }
+ storageKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.StorageKeys))
+ for _, store := range streamFilters.StorageFilter.StorageKeys {
+ keyFilter := HexToKey(store)
+ storageKeyFilters = append(storageKeyFilters, keyFilter)
+ }
+ for stateKey, storageNodes := range payload.StorageNodes {
+ if checkNodeKeys(stateKeyFilters, stateKey) {
+ response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte)
+ for _, storageNode := range storageNodes {
+ if checkNodeKeys(storageKeyFilters, storageNode.Key) {
+ response.StorageNodesRlp[stateKey][storageNode.Key] = storageNode.Value
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
diff --git a/pkg/ipfs/screener_test.go b/pkg/ipfs/screener_test.go
new file mode 100644
index 00000000..a0c59fc2
--- /dev/null
+++ b/pkg/ipfs/screener_test.go
@@ -0,0 +1 @@
+package ipfs
diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go
index 9e0b2eed..7430a683 100644
--- a/pkg/ipfs/service.go
+++ b/pkg/ipfs/service.go
@@ -32,22 +32,22 @@ import (
const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size
-// SyncAndPublish is an interface for streaming, converting to IPLDs, publishing, and indexing all Ethereum data
-// This is the top-level interface used by the syncAndPublish command
-type SyncPublishAndServe interface {
+// SyncPublishScreenAndServe is an interface for streaming, converting to IPLDs, publishing,
+// indexing all Ethereum data screening this data, and serving it up to subscribed clients
+type SyncPublishScreenAndServe interface {
// APIs(), Protocols(), Start() and Stop()
node.Service
// Main event loop for syncAndPublish processes
SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error
// Main event loop for handling client pub-sub
- Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool)
+ ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool)
// Method to subscribe to receive state diff processing output
- Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, params *Params)
+ Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *StreamFilters)
// Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) error
}
-// Processor is the underlying struct for the SyncAndPublish interface
+// Service is the underlying struct for the SyncAndPublish interface
type Service struct {
// Used to sync access to the Subscriptions
sync.Mutex
@@ -59,6 +59,14 @@ type Service struct {
Publisher IPLDPublisher
// Interface for indexing the CIDs of the published ETH-IPLDs in Postgres
Repository CIDRepository
+ // Interface for filtering and serving data according to subscribed clients according to their specification
+ Screener ResponseScreener
+ // Interface for fetching ETH-IPLD objects from IPFS
+ Fetcher IPLDFetcher
+ // Interface for searching and retrieving CIDs from Postgres index
+ Retriever CIDRetriever
+ // Interface for resolving ipfs blocks to their data types
+ Resolver IPLDResolver
// Chan the processor uses to subscribe to state diff payloads from the Streamer
PayloadChan chan statediff.Payload
// Used to signal shutdown of the service
@@ -68,18 +76,27 @@ type Service struct {
}
// NewIPFSProcessor creates a new Processor interface using an underlying Processor struct
-func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncPublishAndServe, error) {
+func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncPublishScreenAndServe, error) {
publisher, err := NewIPLDPublisher(ipfsPath)
if err != nil {
return nil, err
}
+ fetcher, err := NewIPLDFetcher(ipfsPath)
+ if err != nil {
+ return nil, err
+ }
return &Service{
- Streamer: NewStateDiffStreamer(rpcClient),
- Repository: NewCIDRepository(db),
- Converter: NewPayloadConverter(ethClient),
- Publisher: publisher,
- PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
- QuitChan: qc,
+ Streamer: NewStateDiffStreamer(rpcClient),
+ Repository: NewCIDRepository(db),
+ Converter: NewPayloadConverter(ethClient),
+ Publisher: publisher,
+ Screener: NewResponseScreener(),
+ Fetcher: fetcher,
+ Retriever: NewCIDRetriever(db),
+ Resolver: NewIPLDResolver(),
+ PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
+ QuitChan: qc,
+ Subscriptions: make(map[rpc.ID]Subscription),
}, nil
}
@@ -121,6 +138,8 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<-
log.Error(err)
continue
}
+ // If we have a ScreenAndServe process running, forward the payload to it
+ // If the ScreenAndServe process loop is slower than this one, will it miss some incoming payloads??
select {
case forwardPayloadChan <- *ipldPayload:
default:
@@ -137,6 +156,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<-
case err = <-sub.Err():
log.Error(err)
case <-sap.QuitChan:
+ // If we have a ScreenAndServe process running, forward the quit signal to it
select {
case forwardQuitchan <- true:
default:
@@ -151,16 +171,19 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<-
return nil
}
-func (sap *Service) Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) {
+// ScreenAndServe is the processing loop used to screen data streamed from the state diffing eth node and send the appropriate data to a requesting client subscription
+func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) {
wg.Add(1)
go func() {
for {
select {
case payload := <-receivePayloadChan:
- println(payload.BlockNumber.Int64())
- // Method for using subscription parameters to filter payload and stream relevent info to sub channel
+ err := sap.processResponse(payload)
+ if err != nil {
+ log.Error(err)
+ }
case <-receiveQuitchan:
- log.Info("quiting Serve process")
+ log.Info("quiting ScreenAndServe process")
wg.Done()
return
}
@@ -168,15 +191,59 @@ func (sap *Service) Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayl
}()
}
-// Subscribe is used by the API to subscribe to the StateDiffingService loop
-func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, params *Params) {
+func (sap *Service) processResponse(payload IPLDPayload) error {
+ for id, sub := range sap.Subscriptions {
+ response, err := sap.Screener.ScreenResponse(sub.StreamFilters, payload)
+ if err != nil {
+ return err
+ }
+ sap.serve(id, *response)
+ }
+ return nil
+}
+
+// Subscribe is used by the API to subscribe to the service loop
+func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *StreamFilters) {
log.Info("Subscribing to the statediff service")
sap.Lock()
sap.Subscriptions[id] = Subscription{
- PayloadChan: sub,
- QuitChan: quitChan,
+ PayloadChan: sub,
+ QuitChan: quitChan,
+ StreamFilters: streamFilters,
}
sap.Unlock()
+ // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data
+ // Otherwise we only filter new data as it is streamed in from the state diffing geth node
+ if streamFilters.BackFill {
+ // Retrieve cached CIDs relevant to this subscriber
+ cids, err := sap.Retriever.RetrieveCIDs(*streamFilters)
+ if err != nil {
+ log.Error(err)
+ sap.serve(id, ResponsePayload{
+ Err: err,
+ })
+ return
+ }
+ for _, cid := range cids {
+ blocksWrapper, err := sap.Fetcher.FetchCIDs(cid)
+ if err != nil {
+ log.Error(err)
+ sap.serve(id, ResponsePayload{
+ Err: err,
+ })
+ return
+ }
+ backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper)
+ if err != nil {
+ log.Error(err)
+ sap.serve(id, ResponsePayload{
+ Err: err,
+ })
+ return
+ }
+ sap.serve(id, *backFillIplds)
+ }
+ }
}
// Unsubscribe is used to unsubscribe to the StateDiffingService loop
@@ -192,28 +259,29 @@ func (sap *Service) Unsubscribe(id rpc.ID) error {
return nil
}
-// Start is used to begin the StateDiffingService
+// Start is used to begin the service
func (sap *Service) Start(*p2p.Server) error {
log.Info("Starting statediff service")
wg := new(sync.WaitGroup)
payloadChan := make(chan IPLDPayload)
quitChan := make(chan bool)
- go sap.SyncAndPublish(wg, payloadChan, quitChan)
- go sap.Serve(wg, payloadChan, quitChan)
+ sap.SyncAndPublish(wg, payloadChan, quitChan)
+ sap.ScreenAndServe(wg, payloadChan, quitChan)
return nil
}
-// Stop is used to close down the StateDiffingService
+// Stop is used to close down the service
func (sap *Service) Stop() error {
log.Info("Stopping statediff service")
close(sap.QuitChan)
return nil
}
-// send is used to fan out and serve a payload to any subscriptions
-func (sap *Service) send(payload ResponsePayload) {
+// serve is used to send screened payloads to their requesting sub
+func (sap *Service) serve(id rpc.ID, payload ResponsePayload) {
sap.Lock()
- for id, sub := range sap.Subscriptions {
+ sub, ok := sap.Subscriptions[id]
+ if ok {
select {
case sub.PayloadChan <- payload:
log.Infof("sending state diff payload to subscription %s", id)
diff --git a/pkg/ipfs/test_helpers/mocks/api.go b/pkg/ipfs/test_helpers/mocks/api.go
new file mode 100644
index 00000000..f726b26e
--- /dev/null
+++ b/pkg/ipfs/test_helpers/mocks/api.go
@@ -0,0 +1 @@
+package mocks
diff --git a/pkg/ipfs/test_helpers/mocks/screener.go b/pkg/ipfs/test_helpers/mocks/screener.go
new file mode 100644
index 00000000..f726b26e
--- /dev/null
+++ b/pkg/ipfs/test_helpers/mocks/screener.go
@@ -0,0 +1 @@
+package mocks
diff --git a/pkg/ipfs/test_helpers/test_data.go b/pkg/ipfs/test_helpers/test_data.go
index 83fc7f94..8ce24d8e 100644
--- a/pkg/ipfs/test_helpers/test_data.go
+++ b/pkg/ipfs/test_helpers/test_data.go
@@ -24,18 +24,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
)
-// AddressToLeafKey hashes an returns an address
-func AddressToLeafKey(address common.Address) common.Hash {
- return common.BytesToHash(crypto.Keccak256(address[:]))
-}
-
// Test variables
var (
BlockNumber = big.NewInt(rand.Int63())
@@ -55,9 +49,9 @@ var (
}}
emptyStorage = make([]statediff.StorageDiff, 0)
address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592")
- ContractLeafKey = AddressToLeafKey(address)
+ ContractLeafKey = ipfs.AddressToKey(address)
anotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593")
- AnotherContractLeafKey = AddressToLeafKey(anotherAddress)
+ AnotherContractLeafKey = ipfs.AddressToKey(anotherAddress)
testAccount = state.Account{
Nonce: NewNonceValue,
Balance: big.NewInt(NewBalanceValue),
@@ -143,14 +137,14 @@ var (
HeaderCID: "mockHeaderCID",
TransactionCIDs: map[common.Hash]*ipfs.TrxMetaData{
common.HexToHash("0x0"): {
- CID: "mockTrxCID1",
- To: "mockTo1",
- From: "mockFrom1",
+ CID: "mockTrxCID1",
+ Dst: "mockTo1",
+ Src: "mockFrom1",
},
common.HexToHash("0x1"): {
- CID: "mockTrxCID2",
- To: "mockTo2",
- From: "mockFrom2",
+ CID: "mockTrxCID2",
+ Dst: "mockTo2",
+ Src: "mockFrom2",
},
},
ReceiptCIDs: map[common.Hash]*ipfs.ReceiptMetaData{
diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go
index 228cf85c..36cafb02 100644
--- a/pkg/ipfs/types.go
+++ b/pkg/ipfs/types.go
@@ -20,23 +20,28 @@ import (
"encoding/json"
"math/big"
+ "github.com/ipfs/go-block-format"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
-//
+// Subscription holds the information for an individual client subscription
type Subscription struct {
- PayloadChan chan<- ResponsePayload
- QuitChan chan<- bool
+ PayloadChan chan<- ResponsePayload
+ QuitChan chan<- bool
+ StreamFilters *StreamFilters
}
+// ResponsePayload holds the data returned from the seed node to the requesting client
type ResponsePayload struct {
- HeadersRlp [][]byte `json:"headersRlp"`
- UnclesRlp [][]byte `json:"unclesRlp"`
- TransactionsRlp [][]byte `json:"transactionsRlp"`
- ReceiptsRlp [][]byte `json:"receiptsRlp"`
- StateNodesRlp [][]byte `json:"stateNodesRlp"`
- StorageNodesRlp [][]byte `json:"storageNodesRlp"`
+ HeadersRlp [][]byte `json:"headersRlp"`
+ UnclesRlp [][]byte `json:"unclesRlp"`
+ TransactionsRlp [][]byte `json:"transactionsRlp"`
+ ReceiptsRlp [][]byte `json:"receiptsRlp"`
+ StateNodesRlp map[common.Hash][]byte `json:"stateNodesRlp"`
+ StorageNodesRlp map[common.Hash]map[common.Hash][]byte `json:"storageNodesRlp"`
+ Err error `json:"error"`
encoded []byte
err error
@@ -60,6 +65,23 @@ func (sd *ResponsePayload) Encode() ([]byte, error) {
return sd.encoded, sd.err
}
+type cidWrapper struct {
+ BlockNumber int64
+ Headers []string
+ Transactions []string
+ Receipts []string
+ StateNodes []StateNodeCID
+ StorageNodes []StorageNodeCID
+}
+
+type ipfsBlockWrapper struct {
+ Headers []blocks.Block
+ Transactions []blocks.Block
+ Receipts []blocks.Block
+ StateNodes map[common.Hash]blocks.Block
+ StorageNodes map[common.Hash]map[common.Hash]blocks.Block
+}
+
// IPLDPayload is a custom type which packages ETH data for the IPFS publisher
type IPLDPayload struct {
HeaderRLP []byte
@@ -73,11 +95,13 @@ type IPLDPayload struct {
StorageNodes map[common.Hash][]StorageNode
}
+// StateNode struct used to flag node as leaf or not
type StateNode struct {
Value []byte
Leaf bool
}
+// StorageNode struct used to flag node as leaf or not
type StorageNode struct {
Key common.Hash
Value []byte
@@ -96,15 +120,19 @@ type CIDPayload struct {
StorageNodeCIDs map[common.Hash][]StorageNodeCID
}
+// StateNodeCID is used to associate a leaf flag with a state node cid
type StateNodeCID struct {
CID string
Leaf bool
+ Key string `db:"state_key"`
}
+// StorageNodeCID is used to associate a leaf flag with a storage node cid
type StorageNodeCID struct {
- Key common.Hash
- CID string
- Leaf bool
+ Key string `db:"storage_key"`
+ CID string
+ Leaf bool
+ StateKey string `db:"state_key"`
}
// ReceiptMetaData wraps some additional data around our receipt CIDs for indexing
@@ -115,45 +143,39 @@ type ReceiptMetaData struct {
// TrxMetaData wraps some additional data around our transaction CID for indexing
type TrxMetaData struct {
- CID string
- To string
- From string
+ CID string
+ Src string
+ Dst string
}
-// Params are set by the client to tell the server how to filter that is fed into their subscription
-type Params struct {
- HeaderFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64 // set to 0 or a negative value to have no ending block
- Uncles bool
+// StreamFilters are defined by the client to specifiy which data to receive from the seed node
+type StreamFilters struct {
+ BackFill bool
+ BackFillOnly bool
+ StartingBlock int64
+ EndingBlock int64 // set to 0 or a negative value to have no ending block
+ HeaderFilter struct {
+ Off bool
+ FinalOnly bool
}
TrxFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64
- Src string
- Dst string
+ Off bool
+ Src []string
+ Dst []string
}
ReceiptFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64
- Topic0s []string
+ Off bool
+ Topic0s []string
}
StateFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64
- Address string // is converted to state key by taking its keccak256 hash
- LeafsOnly bool
+ Off bool
+ Addresses []string // is converted to state key by taking its keccak256 hash
+ IntermediateNodes bool
}
StorageFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64
- Address string
- StorageKey string
- LeafsOnly bool
+ Off bool
+ Addresses []string
+ StorageKeys []string
+ IntermediateNodes bool
}
}