diff --git a/cmd/root.go b/cmd/root.go
index 6b45ea52..1b647625 100644
--- a/cmd/root.go
+++ b/cmd/root.go
@@ -40,6 +40,7 @@ var (
cfgFile string
databaseConfig config.Database
genConfig config.Plugin
+ subConfig config.Subscription
ipc string
levelDbPath string
queueRecheckInterval time.Duration
diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go
new file mode 100644
index 00000000..fe895d49
--- /dev/null
+++ b/cmd/streamSubscribe.go
@@ -0,0 +1,214 @@
+// Copyright © 2019 Vulcanize, Inc
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cmd
+
+import (
+ "bytes"
+
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/rpc"
+ log "github.com/sirupsen/logrus"
+ "github.com/spf13/cobra"
+ "github.com/spf13/viper"
+
+ "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
+ "github.com/vulcanize/vulcanizedb/pkg/config"
+ "github.com/vulcanize/vulcanizedb/pkg/core"
+ "github.com/vulcanize/vulcanizedb/pkg/geth/client"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs"
+)
+
+// streamSubscribeCmd represents the streamSubscribe command
+var streamSubscribeCmd = &cobra.Command{
+ Use: "streamSubscribe",
+ Short: "This command is used to subscribe to the seed node stream with the provided filters",
+ Long: ``,
+ Run: func(cmd *cobra.Command, args []string) {
+ streamSubscribe()
+ },
+}
+
+func init() {
+ rootCmd.AddCommand(streamSubscribeCmd)
+}
+
+func streamSubscribe() {
+ // Prep the subscription config/filters to be sent to the server
+ subscriptionConfig()
+
+ // Create a new rpc client and a subscription streamer with that client
+ rpcClient := getRpcClient()
+ str := streamer.NewSeedStreamer(rpcClient)
+
+ // Buffered channel for reading subscription payloads
+ payloadChan := make(chan ipfs.ResponsePayload, 8000)
+
+ // Subscribe to the seed node service with the given config/filter parameters
+ sub, err := str.Stream(payloadChan, subConfig)
+ if err != nil {
+ println(err.Error())
+ log.Fatal(err)
+ }
+
+ // Receive response payloads and print out the results
+ for {
+ select {
+ case payload := <-payloadChan:
+ if payload.Err != nil {
+ log.Error(payload.Err)
+ }
+ for _, headerRlp := range payload.HeadersRlp {
+ var header types.Header
+ err = rlp.Decode(bytes.NewBuffer(headerRlp), &header)
+ if err != nil {
+ println(err.Error())
+ log.Error(err)
+ }
+ println("Header")
+ println(header.Hash().Hex())
+ println(header.Number.Int64())
+ }
+ for _, trxRlp := range payload.TransactionsRlp {
+ var trx types.Transaction
+ buff := bytes.NewBuffer(trxRlp)
+ stream := rlp.NewStream(buff, 0)
+ err := trx.DecodeRLP(stream)
+ if err != nil {
+ println(err.Error())
+ log.Error(err)
+ }
+ println("Trx")
+ println(trx.Hash().Hex())
+ }
+ for _, rctRlp := range payload.ReceiptsRlp {
+ var rct types.Receipt
+ buff := bytes.NewBuffer(rctRlp)
+ stream := rlp.NewStream(buff, 0)
+ err = rct.DecodeRLP(stream)
+ if err != nil {
+ println(err.Error())
+ log.Error(err)
+ }
+ println("Rct")
+ for _, l := range rct.Logs {
+ println("log")
+ println(l.BlockHash.Hex())
+ println(l.TxHash.Hex())
+ println(l.Address.Hex())
+ }
+ }
+ for key, stateRlp := range payload.StateNodesRlp {
+ var acct state.Account
+ err = rlp.Decode(bytes.NewBuffer(stateRlp), &acct)
+ if err != nil {
+ println(err.Error())
+ log.Error(err)
+ }
+ println("State")
+ print("key: ")
+ println(key.Hex())
+ print("root: ")
+ println(acct.Root.Hex())
+ print("balance: ")
+ println(acct.Balance.Int64())
+ }
+ for stateKey, mappedRlp := range payload.StorageNodesRlp {
+ println("Storage")
+ print("state key: ")
+ println(stateKey.Hex())
+ for storageKey, storageRlp := range mappedRlp {
+ println("Storage")
+ print("key: ")
+ println(storageKey.Hex())
+ var i []interface{}
+ err := rlp.DecodeBytes(storageRlp, i)
+ if err != nil {
+ println(err.Error())
+ log.Error(err)
+ }
+ print("bytes: ")
+ println(storageRlp)
+ }
+ }
+ case err = <-sub.Err():
+ println(err.Error())
+ log.Fatal(err)
+ }
+ }
+}
+
+func subscriptionConfig() {
+ log.Info("loading subscription config")
+ vulcPath = viper.GetString("subscription.path")
+ subConfig = config.Subscription{
+ // Below default to false, which means we do not backfill by default
+ BackFill: viper.GetBool("subscription.backfill"),
+ BackFillOnly: viper.GetBool("subscription.backfillOnly"),
+
+ // Below default to 0
+ // 0 start means we start at the beginning and 0 end means we continue indefinitely
+ StartingBlock: viper.GetInt64("subscription.startingBlock"),
+ EndingBlock: viper.GetInt64("subscription.endingBlock"),
+
+ // Below default to false, which means we get all headers by default
+ HeaderFilter: config.HeaderFilter{
+ Off: viper.GetBool("subscription.headerFilter.off"),
+ FinalOnly: viper.GetBool("subscription.headerFilter.finalOnly"),
+ },
+
+ // Below defaults to false and two slices of length 0
+ // Which means we get all transactions by default
+ TrxFilter: config.TrxFilter{
+ Off: viper.GetBool("subscription.trxFilter.off"),
+ Src: viper.GetStringSlice("subscription.trxFilter.src"),
+ Dst: viper.GetStringSlice("subscription.trxFilter.dst"),
+ },
+
+ // Below defaults to false and one slice of length 0
+ // Which means we get all receipts by default
+ ReceiptFilter: config.ReceiptFilter{
+ Off: viper.GetBool("subscription.receiptFilter.off"),
+ Topic0s: viper.GetStringSlice("subscription.receiptFilter.topic0s"),
+ },
+
+ // Below defaults to two false, and a slice of length 0
+ // Which means we get all state leafs by default, but no intermediate nodes
+ StateFilter: config.StateFilter{
+ Off: viper.GetBool("subscription.stateFilter.off"),
+ IntermediateNodes: viper.GetBool("subscription.stateFilter.intermediateNodes"),
+ Addresses: viper.GetStringSlice("subscription.stateFilter.addresses"),
+ },
+
+ // Below defaults to two false, and two slices of length 0
+ // Which means we get all storage leafs by default, but no intermediate nodes
+ StorageFilter: config.StorageFilter{
+ Off: viper.GetBool("subscription.storageFilter.off"),
+ IntermediateNodes: viper.GetBool("subscription.storageFilter.intermediateNodes"),
+ Addresses: viper.GetStringSlice("subscription.storageFilter.addresses"),
+ StorageKeys: viper.GetStringSlice("subscription.storageFilter.storageKeys"),
+ },
+ }
+}
+
+func getRpcClient() core.RpcClient {
+ rawRpcClient, err := rpc.Dial(vulcPath)
+ if err != nil {
+ log.Fatal(err)
+ }
+ return client.NewRpcClient(rawRpcClient, vulcPath)
+}
diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go
index b1dd454a..3b1d1f15 100644
--- a/cmd/syncAndPublish.go
+++ b/cmd/syncAndPublish.go
@@ -39,7 +39,7 @@ var syncAndPublishCmd = &cobra.Command{
Short: "Syncs all Ethereum data into IPFS, indexing the CIDs",
Long: `This command works alongside a modified geth node which streams
all block and state (diff) data over a websocket subscription. This process
-then converts the eth objects to IPLDs and publishes them IPFS. Additionally,
+then converts the eth data to IPLD objects and publishes them to IPFS. Additionally,
it maintains a local index of the IPLD objects' CIDs in Postgres.`,
Run: func(cmd *cobra.Command, args []string) {
syncAndPublish()
diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go
index 0b0d4b8a..14c53bdf 100644
--- a/cmd/syncPublishScreenAndServe.go
+++ b/cmd/syncPublishScreenAndServe.go
@@ -29,13 +29,12 @@ import (
// syncPublishScreenAndServeCmd represents the syncPublishScreenAndServe command
var syncPublishScreenAndServeCmd = &cobra.Command{
Use: "syncPublishScreenAndServe",
- Short: "A brief description of your command",
- Long: `A longer description that spans multiple lines and likely contains examples
-and usage of using your command. For example:
-
-Cobra is a CLI library for Go that empowers applications.
-This application is a tool to generate the needed files
-to quickly create a Cobra application.`,
+ Short: "Syncs all Ethereum data into IPFS, indexing the CIDs, and uses this to serve data requests to requesting clients",
+ Long: `This command works alongside a modified geth node which streams
+all block and state (diff) data over a websocket subscription. This process
+then converts the eth data to IPLD objects and publishes them to IPFS. Additionally,
+it maintains a local index of the IPLD objects' CIDs in Postgres. It then opens up a server which
+relays relevant data to requesting clients.`,
Run: func(cmd *cobra.Command, args []string) {
syncPublishScreenAndServe()
},
@@ -44,7 +43,7 @@ to quickly create a Cobra application.`,
func init() {
rootCmd.AddCommand(syncPublishScreenAndServeCmd)
syncPublishScreenAndServeCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "~/.ipfs", "Path for configuring IPFS node")
- syncPublishScreenAndServeCmd.Flags().StringVarP(&vulcPath, "ipc-path", "p", "~/.vulcanize/vulcanize.ipc", "IPC path for the Vulcanize seed node server")
+ syncPublishScreenAndServeCmd.Flags().StringVarP(&vulcPath, "sub-path", "p", "~/.vulcanize/vulcanize.ipc", "IPC path for the Vulcanize seed node server")
}
func syncPublishScreenAndServe() {
diff --git a/cmd/test2.go b/cmd/test2.go
deleted file mode 100644
index 18e98869..00000000
--- a/cmd/test2.go
+++ /dev/null
@@ -1,144 +0,0 @@
-// Copyright © 2019 Vulcanize, Inc
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package cmd
-
-import (
- "bytes"
-
- "github.com/ethereum/go-ethereum/core/state"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/rlp"
- "github.com/ethereum/go-ethereum/rpc"
- log "github.com/sirupsen/logrus"
- "github.com/spf13/cobra"
-
- "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
- "github.com/vulcanize/vulcanizedb/pkg/core"
- "github.com/vulcanize/vulcanizedb/pkg/geth/client"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs"
-)
-
-// test2Cmd represents the test2 command
-var test2Cmd = &cobra.Command{
- Use: "test2",
- Short: "A brief description of your command",
- Long: `A longer description that spans multiple lines and likely contains examples
-and usage of using your command. For example:
-
-Cobra is a CLI library for Go that empowers applications.
-This application is a tool to generate the needed files
-to quickly create a Cobra application.`,
- Run: func(cmd *cobra.Command, args []string) {
- test2()
- },
-}
-
-func test2() {
- rpcClient := getRpcClient()
- str := streamer.NewSeedStreamer(rpcClient)
- payloadChan := make(chan ipfs.ResponsePayload, 8000)
- streamFilters := ipfs.StreamFilters{}
- streamFilters.HeaderFilter.FinalOnly = true
- streamFilters.ReceiptFilter.Topic0s = []string{
- "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
- "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377",
- }
- streamFilters.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
- streamFilters.StorageFilter.Off = true
- //streamFilters.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
- //streamFilters.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
- sub, err := str.Stream(payloadChan, streamFilters)
- if err != nil {
- println(err.Error())
- log.Fatal(err)
- }
- for {
- select {
- case payload := <-payloadChan:
- if payload.Err != nil {
- log.Error(payload.Err)
- }
- for _, headerRlp := range payload.HeadersRlp {
- var header types.Header
- err = rlp.Decode(bytes.NewBuffer(headerRlp), &header)
- if err != nil {
- println(err.Error())
- log.Error(err)
- }
- println("header")
- println(header.Hash().Hex())
- println(header.Number.Int64())
- }
- for _, trxRlp := range payload.TransactionsRlp {
- var trx types.Transaction
- buff := bytes.NewBuffer(trxRlp)
- stream := rlp.NewStream(buff, 0)
- err := trx.DecodeRLP(stream)
- if err != nil {
- println(err.Error())
- log.Error(err)
- }
- println("trx")
- println(trx.Hash().Hex())
- }
- for _, rctRlp := range payload.ReceiptsRlp {
- var rct types.Receipt
- buff := bytes.NewBuffer(rctRlp)
- stream := rlp.NewStream(buff, 0)
- err = rct.DecodeRLP(stream)
- if err != nil {
- println(err.Error())
- log.Error(err)
- }
- println("rct")
- for _, l := range rct.Logs {
- println("log")
- println(l.BlockHash.Hex())
- println(l.TxHash.Hex())
- println(l.Address.Hex())
- }
- }
- for _, stateRlp := range payload.StateNodesRlp {
- var acct state.Account
- err = rlp.Decode(bytes.NewBuffer(stateRlp), &acct)
- if err != nil {
- println(err.Error())
- log.Error(err)
- }
- println("state")
- println(acct.Root.Hex())
- println(acct.Balance.Int64())
- }
- case err = <-sub.Err():
- println(err.Error())
- log.Fatal(err)
- }
- }
-}
-
-func init() {
- rootCmd.AddCommand(test2Cmd)
- test2Cmd.Flags().StringVarP(&vulcPath, "ipc-path", "p", "~/.vulcanize/vulcanize.ipc", "IPC path for the Vulcanize seed node server")
-}
-
-func getRpcClient() core.RpcClient {
- println(vulcPath)
- rawRpcClient, err := rpc.Dial(vulcPath)
- if err != nil {
- log.Fatal(err)
- }
- return client.NewRpcClient(rawRpcClient, vulcPath)
-}
diff --git a/libraries/shared/streamer/streamer.go b/libraries/shared/streamer/streamer.go
index f296e469..5d74a449 100644
--- a/libraries/shared/streamer/streamer.go
+++ b/libraries/shared/streamer/streamer.go
@@ -19,6 +19,7 @@ package streamer
import (
"github.com/ethereum/go-ethereum/rpc"
+ "github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
@@ -26,7 +27,7 @@ import (
// IStreamer is the interface for streaming data from a vulcanizeDB seed node
type IStreamer interface {
- Stream(payloadChan chan ipfs.ResponsePayload, streamFilters ipfs.StreamFilters) (*rpc.ClientSubscription, error)
+ Stream(payloadChan chan ipfs.ResponsePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error)
}
// Streamer is the underlying struct for the IStreamer interface
@@ -42,6 +43,6 @@ func NewSeedStreamer(client core.RpcClient) *Streamer {
}
// Stream is the main loop for subscribing to data from a vulcanizedb seed node
-func (sds *Streamer) Stream(payloadChan chan ipfs.ResponsePayload, streamFilters ipfs.StreamFilters) (*rpc.ClientSubscription, error) {
+func (sds *Streamer) Stream(payloadChan chan ipfs.ResponsePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) {
return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters)
}
diff --git a/pkg/config/subscription.go b/pkg/config/subscription.go
new file mode 100644
index 00000000..50949c9b
--- /dev/null
+++ b/pkg/config/subscription.go
@@ -0,0 +1,43 @@
+package config
+
+// Subscription config is used by a subscribing transformer to specifiy which data to receive from the seed node
+type Subscription struct {
+ BackFill bool
+ BackFillOnly bool
+ StartingBlock int64
+ EndingBlock int64 // set to 0 or a negative value to have no ending block
+ HeaderFilter HeaderFilter
+ TrxFilter TrxFilter
+ ReceiptFilter ReceiptFilter
+ StateFilter StateFilter
+ StorageFilter StorageFilter
+}
+
+type HeaderFilter struct {
+ Off bool
+ FinalOnly bool
+}
+
+type TrxFilter struct {
+ Off bool
+ Src []string
+ Dst []string
+}
+
+type ReceiptFilter struct {
+ Off bool
+ Topic0s []string
+}
+
+type StateFilter struct {
+ Off bool
+ Addresses []string // is converted to state key by taking its keccak256 hash
+ IntermediateNodes bool
+}
+
+type StorageFilter struct {
+ Off bool
+ Addresses []string
+ StorageKeys []string
+ IntermediateNodes bool
+}
\ No newline at end of file
diff --git a/pkg/config/subscription_test.go b/pkg/config/subscription_test.go
new file mode 100644
index 00000000..d912156b
--- /dev/null
+++ b/pkg/config/subscription_test.go
@@ -0,0 +1 @@
+package config
diff --git a/pkg/ipfs/api.go b/pkg/ipfs/api.go
index cd42252c..150ece3f 100644
--- a/pkg/ipfs/api.go
+++ b/pkg/ipfs/api.go
@@ -19,6 +19,8 @@ package ipfs
import (
"context"
+ "github.com/vulcanize/vulcanizedb/pkg/config"
+
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
@@ -42,7 +44,7 @@ func NewPublicSeedNodeAPI(snp SyncPublishScreenAndServe) *PublicSeedNodeAPI {
}
// Stream is the public method to setup a subscription that fires off SyncPublishScreenAndServe payloads as they are created
-func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters StreamFilters) (*rpc.Subscription, error) {
+func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.Subscription) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
diff --git a/pkg/ipfs/converter_test.go b/pkg/ipfs/converter_test.go
index 859d215d..88b8c403 100644
--- a/pkg/ipfs/converter_test.go
+++ b/pkg/ipfs/converter_test.go
@@ -20,19 +20,19 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks"
)
var _ = Describe("Converter", func() {
Describe("Convert", func() {
It("Converts StatediffPayloads into IPLDPayloads", func() {
mockConverter := mocks.PayloadConverter{}
- mockConverter.ReturnIPLDPayload = &test_helpers.MockIPLDPayload
- ipldPayload, err := mockConverter.Convert(test_helpers.MockStatediffPayload)
+ mockConverter.ReturnIPLDPayload = &helpers.MockIPLDPayload
+ ipldPayload, err := mockConverter.Convert(helpers.MockStatediffPayload)
Expect(err).ToNot(HaveOccurred())
- Expect(ipldPayload).To(Equal(&test_helpers.MockIPLDPayload))
- Expect(mockConverter.PassedStatediffPayload).To(Equal(test_helpers.MockStatediffPayload))
+ Expect(ipldPayload).To(Equal(&helpers.MockIPLDPayload))
+ Expect(mockConverter.PassedStatediffPayload).To(Equal(helpers.MockStatediffPayload))
})
})
})
diff --git a/pkg/ipfs/test_helpers/mocks/api.go b/pkg/ipfs/helpers/mocks/api.go
similarity index 100%
rename from pkg/ipfs/test_helpers/mocks/api.go
rename to pkg/ipfs/helpers/mocks/api.go
diff --git a/pkg/ipfs/test_helpers/mocks/converter.go b/pkg/ipfs/helpers/mocks/converter.go
similarity index 100%
rename from pkg/ipfs/test_helpers/mocks/converter.go
rename to pkg/ipfs/helpers/mocks/converter.go
diff --git a/pkg/ipfs/test_helpers/mocks/publisher.go b/pkg/ipfs/helpers/mocks/publisher.go
similarity index 100%
rename from pkg/ipfs/test_helpers/mocks/publisher.go
rename to pkg/ipfs/helpers/mocks/publisher.go
diff --git a/pkg/ipfs/helpers/mocks/repository.go b/pkg/ipfs/helpers/mocks/repository.go
new file mode 100644
index 00000000..dc06ccc9
--- /dev/null
+++ b/pkg/ipfs/helpers/mocks/repository.go
@@ -0,0 +1,31 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package mocks
+
+import "github.com/vulcanize/vulcanizedb/pkg/ipfs"
+
+// CIDRepository is the underlying struct for the Repository interface
+type CIDRepository struct {
+ PassedCIDPayload *ipfs.CIDPayload
+ ReturnErr error
+}
+
+// Index indexes a cidPayload in Postgres
+func (repo *CIDRepository) Index(cidPayload *ipfs.CIDPayload) error {
+ repo.PassedCIDPayload = cidPayload
+ return repo.ReturnErr
+}
diff --git a/pkg/ipfs/test_helpers/mocks/screener.go b/pkg/ipfs/helpers/mocks/screener.go
similarity index 100%
rename from pkg/ipfs/test_helpers/mocks/screener.go
rename to pkg/ipfs/helpers/mocks/screener.go
diff --git a/pkg/ipfs/test_helpers/mocks/streamer.go b/pkg/ipfs/helpers/mocks/streamer.go
similarity index 100%
rename from pkg/ipfs/test_helpers/mocks/streamer.go
rename to pkg/ipfs/helpers/mocks/streamer.go
diff --git a/pkg/ipfs/test_helpers/test_data.go b/pkg/ipfs/helpers/test_data.go
similarity index 99%
rename from pkg/ipfs/test_helpers/test_data.go
rename to pkg/ipfs/helpers/test_data.go
index 2401a970..33daefd0 100644
--- a/pkg/ipfs/test_helpers/test_data.go
+++ b/pkg/ipfs/helpers/test_data.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-package test_helpers
+package helpers
import (
"errors"
diff --git a/pkg/ipfs/publisher_test.go b/pkg/ipfs/publisher_test.go
index 744f886e..2e14a814 100644
--- a/pkg/ipfs/publisher_test.go
+++ b/pkg/ipfs/publisher_test.go
@@ -20,19 +20,19 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks"
)
var _ = Describe("Publisher", func() {
Describe("Publish", func() {
It("Publishes IPLDPayload to IPFS", func() {
mockPublisher := mocks.IPLDPublisher{}
- mockPublisher.ReturnCIDPayload = &test_helpers.MockCIDPayload
- cidPayload, err := mockPublisher.Publish(&test_helpers.MockIPLDPayload)
+ mockPublisher.ReturnCIDPayload = &helpers.MockCIDPayload
+ cidPayload, err := mockPublisher.Publish(&helpers.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
- Expect(cidPayload).To(Equal(&test_helpers.MockCIDPayload))
- Expect(mockPublisher.PassedIPLDPayload).To(Equal(&test_helpers.MockIPLDPayload))
+ Expect(cidPayload).To(Equal(&helpers.MockCIDPayload))
+ Expect(mockPublisher.PassedIPLDPayload).To(Equal(&helpers.MockIPLDPayload))
})
})
})
diff --git a/pkg/ipfs/repository.go b/pkg/ipfs/repository.go
index 474e2921..a433eb71 100644
--- a/pkg/ipfs/repository.go
+++ b/pkg/ipfs/repository.go
@@ -17,6 +17,7 @@
package ipfs
import (
+ "github.com/i-norden/go-ethereum/core"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
@@ -144,3 +145,7 @@ func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID,
stateID, storageCID.Key, storageCID.CID, storageCID.Leaf)
return err
}
+
+type RepositoryError struct {
+ core.Message
+}
diff --git a/pkg/ipfs/repository_test.go b/pkg/ipfs/repository_test.go
index bc5829a1..0f127224 100644
--- a/pkg/ipfs/repository_test.go
+++ b/pkg/ipfs/repository_test.go
@@ -20,17 +20,17 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks"
)
var _ = Describe("Repository", func() {
Describe("Index", func() {
It("Indexes CIDs against their metadata", func() {
mockRepo := mocks.CIDRepository{}
- err := mockRepo.Index(&test_helpers.MockCIDPayload)
+ err := mockRepo.Index(&helpers.MockCIDPayload)
Expect(err).ToNot(HaveOccurred())
- Expect(mockRepo.PassedCIDPayload).To(Equal(&test_helpers.MockCIDPayload))
+ Expect(mockRepo.PassedCIDPayload).To(Equal(&helpers.MockCIDPayload))
})
})
})
diff --git a/pkg/ipfs/retreiver.go b/pkg/ipfs/retreiver.go
index e01cd221..cb113b0f 100644
--- a/pkg/ipfs/retreiver.go
+++ b/pkg/ipfs/retreiver.go
@@ -19,12 +19,13 @@ package ipfs
import (
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
+ "github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
// CIDRetriever is the interface for retrieving CIDs from the Postgres cache
type CIDRetriever interface {
- RetrieveCIDs(streamFilters StreamFilters) ([]CidWrapper, error)
+ RetrieveCIDs(streamFilters config.Subscription) ([]CidWrapper, error)
}
// EthCIDRetriever is the underlying struct supporting the CIDRetriever interface
@@ -47,7 +48,7 @@ func (ecr *EthCIDRetriever) GetLastBlockNumber() (int64, error) {
}
// RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters
-func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]CidWrapper, error) {
+func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]CidWrapper, error) {
var endingBlock int64
var err error
if streamFilters.EndingBlock <= 0 || streamFilters.EndingBlock <= streamFilters.StartingBlock {
@@ -112,7 +113,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]CidWrap
return cids, err
}
-func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error {
+func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64) error {
var pgStr string
if streamFilters.HeaderFilter.FinalOnly {
pgStr = `SELECT cid FROM header_cids
@@ -125,7 +126,7 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters Stream
return tx.Select(cids.Headers, pgStr, blockNumber)
}
-func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) ([]int64, error) {
+func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64) ([]int64, error) {
args := make([]interface{}, 0, 3)
type result struct {
ID int64 `db:"id"`
@@ -155,7 +156,7 @@ func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFil
return ids, nil
}
-func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64, trxIds []int64) error {
+func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64, trxIds []int64) error {
args := make([]interface{}, 0, 2)
pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
@@ -175,7 +176,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFil
return tx.Select(cids.Receipts, pgStr, args...)
}
-func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error {
+func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64) error {
args := make([]interface{}, 0, 2)
pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1`
@@ -192,7 +193,7 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamF
return tx.Select(cids.StateNodes, pgStr, args...)
}
-func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error {
+func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, cids *CidWrapper, blockNumber int64) error {
args := make([]interface{}, 0, 3)
pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids
WHERE storage_cids.state_id = state_cids.id
diff --git a/pkg/ipfs/screener.go b/pkg/ipfs/screener.go
index 1dfe5ab1..5468870c 100644
--- a/pkg/ipfs/screener.go
+++ b/pkg/ipfs/screener.go
@@ -19,6 +19,8 @@ package ipfs
import (
"bytes"
+ "github.com/vulcanize/vulcanizedb/pkg/config"
+
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/common"
@@ -27,7 +29,7 @@ import (
// ResponseScreener is the inteface used to screen eth data and package appropriate data into a response payload
type ResponseScreener interface {
- ScreenResponse(streamFilters *StreamFilters, payload IPLDPayload) (*ResponsePayload, error)
+ ScreenResponse(streamFilters *config.Subscription, payload IPLDPayload) (*ResponsePayload, error)
}
// Screener is the underlying struct for the ReponseScreener interface
@@ -39,7 +41,7 @@ func NewResponseScreener() *Screener {
}
// ScreenResponse is used to filter through eth data to extract and package requested data into a ResponsePayload
-func (s *Screener) ScreenResponse(streamFilters *StreamFilters, payload IPLDPayload) (*ResponsePayload, error) {
+func (s *Screener) ScreenResponse(streamFilters *config.Subscription, payload IPLDPayload) (*ResponsePayload, error) {
response := new(ResponsePayload)
err := s.filterHeaders(streamFilters, response, payload)
if err != nil {
@@ -64,7 +66,7 @@ func (s *Screener) ScreenResponse(streamFilters *StreamFilters, payload IPLDPayl
return response, nil
}
-func (s *Screener) filterHeaders(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error {
+func (s *Screener) filterHeaders(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error {
if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP)
if !streamFilters.HeaderFilter.FinalOnly {
@@ -87,7 +89,7 @@ func checkRange(start, end, actual int64) bool {
return false
}
-func (s *Screener) filterTransactions(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) {
+func (s *Screener) filterTransactions(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) {
trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions))
if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
for i, trx := range payload.BlockBody.Transactions {
@@ -123,7 +125,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin
return false
}
-func (s *Screener) filerReceipts(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error {
+func (s *Screener) filerReceipts(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error {
if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
for i, receipt := range payload.Receipts {
if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) {
@@ -159,7 +161,7 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics []string, want
return false
}
-func (s *Screener) filterState(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error {
+func (s *Screener) filterState(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error {
response.StateNodesRlp = make(map[common.Hash][]byte)
if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses))
@@ -191,7 +193,7 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
return false
}
-func (s *Screener) filterStorage(streamFilters *StreamFilters, response *ResponsePayload, payload IPLDPayload) error {
+func (s *Screener) filterStorage(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error {
if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) {
stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses))
for _, addr := range streamFilters.StorageFilter.Addresses {
diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go
index aa325562..062a2175 100644
--- a/pkg/ipfs/service.go
+++ b/pkg/ipfs/service.go
@@ -20,6 +20,8 @@ import (
"fmt"
"sync"
+ "github.com/vulcanize/vulcanizedb/pkg/config"
+
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
@@ -42,7 +44,7 @@ type SyncPublishScreenAndServe interface {
// Main event loop for handling client pub-sub
ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool)
// Method to subscribe to receive state diff processing output
- Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *StreamFilters)
+ Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *config.Subscription)
// Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) error
}
@@ -202,7 +204,7 @@ func (sap *Service) processResponse(payload IPLDPayload) error {
}
// Subscribe is used by the API to subscribe to the service loop
-func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *StreamFilters) {
+func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *config.Subscription) {
log.Info("Subscribing to the statediff service")
sap.Lock()
sap.Subscriptions[id] = Subscription{
diff --git a/pkg/ipfs/service_test.go b/pkg/ipfs/service_test.go
index 40dbce90..d5f95071 100644
--- a/pkg/ipfs/service_test.go
+++ b/pkg/ipfs/service_test.go
@@ -26,8 +26,8 @@ import (
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks"
)
var _ = Describe("Service", func() {
@@ -41,18 +41,18 @@ var _ = Describe("Service", func() {
ReturnErr: nil,
}
mockPublisher := &mocks.IPLDPublisher{
- ReturnCIDPayload: &test_helpers.MockCIDPayload,
+ ReturnCIDPayload: &helpers.MockCIDPayload,
ReturnErr: nil,
}
mockStreamer := &mocks.StateDiffStreamer{
ReturnSub: &rpc.ClientSubscription{},
StreamPayloads: []statediff.Payload{
- test_helpers.MockStatediffPayload,
+ helpers.MockStatediffPayload,
},
ReturnErr: nil,
}
mockConverter := &mocks.PayloadConverter{
- ReturnIPLDPayload: &test_helpers.MockIPLDPayload,
+ ReturnIPLDPayload: &helpers.MockIPLDPayload,
ReturnErr: nil,
}
processor := &ipfs.Service{
@@ -68,9 +68,9 @@ var _ = Describe("Service", func() {
time.Sleep(2 * time.Second)
quitChan <- true
wg.Wait()
- Expect(mockConverter.PassedStatediffPayload).To(Equal(test_helpers.MockStatediffPayload))
- Expect(mockCidRepo.PassedCIDPayload).To(Equal(&test_helpers.MockCIDPayload))
- Expect(mockPublisher.PassedIPLDPayload).To(Equal(&test_helpers.MockIPLDPayload))
+ Expect(mockConverter.PassedStatediffPayload).To(Equal(helpers.MockStatediffPayload))
+ Expect(mockCidRepo.PassedCIDPayload).To(Equal(&helpers.MockCIDPayload))
+ Expect(mockPublisher.PassedIPLDPayload).To(Equal(&helpers.MockIPLDPayload))
Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan))
})
})
diff --git a/pkg/ipfs/streamer_test.go b/pkg/ipfs/streamer_test.go
index 1378fa3e..acbc506e 100644
--- a/pkg/ipfs/streamer_test.go
+++ b/pkg/ipfs/streamer_test.go
@@ -22,8 +22,8 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers"
- "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers"
+ "github.com/vulcanize/vulcanizedb/pkg/ipfs/helpers/mocks"
)
var _ = Describe("Streamer", func() {
@@ -32,7 +32,7 @@ var _ = Describe("Streamer", func() {
mockStreamer := mocks.StateDiffStreamer{}
mockStreamer.ReturnSub = &rpc.ClientSubscription{}
mockStreamer.StreamPayloads = []statediff.Payload{
- test_helpers.MockStatediffPayload,
+ helpers.MockStatediffPayload,
}
payloadChan := make(chan statediff.Payload, 1)
sub, err := mockStreamer.Stream(payloadChan)
@@ -40,7 +40,7 @@ var _ = Describe("Streamer", func() {
Expect(sub).To(Equal(&rpc.ClientSubscription{}))
Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan))
streamedPayload := <-payloadChan
- Expect(streamedPayload).To(Equal(test_helpers.MockStatediffPayload))
+ Expect(streamedPayload).To(Equal(helpers.MockStatediffPayload))
})
})
})
diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go
index 904c5efc..bdf23113 100644
--- a/pkg/ipfs/types.go
+++ b/pkg/ipfs/types.go
@@ -20,6 +20,8 @@ import (
"encoding/json"
"math/big"
+ "github.com/vulcanize/vulcanizedb/pkg/config"
+
"github.com/ipfs/go-block-format"
"github.com/ethereum/go-ethereum/common"
@@ -30,7 +32,7 @@ import (
type Subscription struct {
PayloadChan chan<- ResponsePayload
QuitChan chan<- bool
- StreamFilters *StreamFilters
+ StreamFilters *config.Subscription
}
// ResponsePayload holds the data returned from the seed node to the requesting client
@@ -149,35 +151,3 @@ type TrxMetaData struct {
Src string
Dst string
}
-
-// StreamFilters are defined by the client to specifiy which data to receive from the seed node
-type StreamFilters struct {
- BackFill bool
- BackFillOnly bool
- StartingBlock int64
- EndingBlock int64 // set to 0 or a negative value to have no ending block
- HeaderFilter struct {
- Off bool
- FinalOnly bool
- }
- TrxFilter struct {
- Off bool
- Src []string
- Dst []string
- }
- ReceiptFilter struct {
- Off bool
- Topic0s []string
- }
- StateFilter struct {
- Off bool
- Addresses []string // is converted to state key by taking its keccak256 hash
- IntermediateNodes bool
- }
- StorageFilter struct {
- Off bool
- Addresses []string
- StorageKeys []string
- IntermediateNodes bool
- }
-}