review fixes

This commit is contained in:
Ian Norden 2019-08-05 12:56:15 -05:00
parent 2244d1869f
commit 0bbb7a30d1
34 changed files with 253 additions and 469 deletions

View File

@ -40,7 +40,7 @@ var (
cfgFile string
databaseConfig config.Database
genConfig config.Plugin
subConfig config.Subscription
subscriptionConfig config.Subscription
ipc string
levelDbPath string
queueRecheckInterval time.Duration

View File

@ -33,14 +33,14 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
)
// streamSubscribeCmd represents the streamSubscribe command
var streamSubscribeCmd = &cobra.Command{
Use: "streamSubscribe",
Short: "This command is used to subscribe to the seed node stream with the provided filters",
Long: ``,
Long: `This command is for demo and testing purposes and is used to subscribe to the seed node with the provided subscription configuration parameters.
It does not do anything with the data streamed from the seed node other than unpack it and print it out for demonstration purposes.`,
Run: func(cmd *cobra.Command, args []string) {
streamSubscribe()
},
@ -52,17 +52,17 @@ func init() {
func streamSubscribe() {
// Prep the subscription config/filters to be sent to the server
subscriptionConfig()
configureSubscription()
// Create a new rpc client and a subscription streamer with that client
rpcClient := getRpcClient()
str := streamer.NewSeedStreamer(rpcClient)
str := streamer.NewSeedNodeStreamer(rpcClient)
// Buffered channel for reading subscription payloads
payloadChan := make(chan ipfs.ResponsePayload, 20000)
payloadChan := make(chan streamer.SeedNodePayload, 20000)
// Subscribe to the seed node service with the given config/filter parameters
sub, err := str.Stream(payloadChan, subConfig)
sub, err := str.Stream(payloadChan, subscriptionConfig)
if err != nil {
log.Fatal(err)
}
@ -161,9 +161,9 @@ func streamSubscribe() {
}
}
func subscriptionConfig() {
func configureSubscription() {
log.Info("loading subscription config")
subConfig = config.Subscription{
subscriptionConfig = config.Subscription{
// Below default to false, which means we do not backfill by default
BackFill: viper.GetBool("subscription.backfill"),
BackFillOnly: viper.GetBool("subscription.backfillOnly"),

View File

@ -20,6 +20,8 @@ import (
"path/filepath"
syn "sync"
"github.com/vulcanize/vulcanizedb/pkg/seed_node"
"github.com/spf13/viper"
log "github.com/sirupsen/logrus"
@ -33,7 +35,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/geth/client"
vRpc "github.com/vulcanize/vulcanizedb/pkg/geth/converters/rpc"
"github.com/vulcanize/vulcanizedb/pkg/geth/node"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/utils"
)
@ -68,7 +69,7 @@ func syncAndPublish() {
}
ipfsPath = filepath.Join(home, ".ipfs")
}
processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan)
processor, err := seed_node.NewProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan)
if err != nil {
log.Fatal(err)
}

View File

@ -20,6 +20,8 @@ import (
"path/filepath"
syn "sync"
"github.com/vulcanize/vulcanizedb/pkg/seed_node"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -61,7 +63,7 @@ func syncPublishScreenAndServe() {
}
ipfsPath = filepath.Join(home, ".ipfs")
}
processor, err := ipfs.NewIPFSProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan)
processor, err := seed_node.NewProcessor(ipfsPath, &db, ethClient, rpcClient, quitChan)
if err != nil {
log.Fatal(err)
}
@ -94,7 +96,9 @@ func syncPublishScreenAndServe() {
if wsEndpoint == "" {
wsEndpoint = "127.0.0.1:80"
}
_, _, err = rpc.StartWSEndpoint(wsEndpoint, processor.APIs(), []string{"vulcanizedb"}, nil, true)
var exposeAll = true
var wsOrigins []string = nil
_, _, err = rpc.StartWSEndpoint(wsEndpoint, processor.APIs(), []string{"vulcanizedb"}, wsOrigins, exposeAll)
if err != nil {
log.Fatal(err)
}

View File

@ -1,5 +1,5 @@
-- +goose Up
CREATE TABLE public.blocks (
CREATE TABLE IF NOT EXISTS public.blocks (
key TEXT UNIQUE NOT NULL,
data BYTEA NOT NULL
);

View File

@ -14,7 +14,7 @@ conform to the
[standard-readme specification](https://github.com/RichardLitt/standard-readme).
- Once a Pull Request has received two approvals it can be merged in by a core developer.
Pull requests should be opened against the `master` branch. Periodically, updates on `master` will be ported over to `staging` for tagged release.
Pull requests should be opened against the `staging` branch. Periodically, updates on `staging` will be ported over to `master` for tagged release.
## Creating a new migration file
1. `make new_migration NAME=add_columnA_to_table1`

View File

@ -131,8 +131,8 @@ when the geth sync was started), and `client.ipfsPath` which is the path the ipf
#### syncPublishScreenAndServe
`syncPublishScreenAndServe` does everythin th at `syncAndPublish` does, plut it opens up an RPC server which exposes
an endpoint to allow transformers to subscribe to subsets of the sync-and-published data that are relevant to thier transformations
`syncPublishScreenAndServe` does everything that `syncAndPublish` does, plus it opens up an RPC server which exposes
an endpoint to allow transformers to subscribe to subsets of the sync-and-published data that are relevant to their transformations
Usage:
@ -152,11 +152,12 @@ The config file for the `syncPublishScreenAndServe` command has two additional f
[server]
ipcPath = "/Users/user/.vulcanize/vulcanize.ipc"
wsEndpoint = "127.0.0.1:2019"
wsEndpoint = "127.0.0.1:80"
```
The additional `server.ipcPath` and `server.wsEndpoint` fields are used to set what ipc endpoint and ws url
the `syncPublishScreenAndServe` rpc server will expose itself to subscribing transformers over, respectively.
Any valid and available path and endpoint is acceptable, but keep in mind that this path and endpoint need to be known by transformers for them to subscribe to the seed node.
#### Subscribing

View File

@ -0,0 +1,84 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Streamer is used by watchers to stream eth data from a vulcanizedb seed node
package streamer
import (
"encoding/json"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/core"
)
// ISeedNodeStreamer is the interface for streaming data from a vulcanizeDB seed node
type ISeedNodeStreamer interface {
Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error)
}
// SeedNodeStreamer is the underlying struct for the ISeedNodeStreamer interface
type SeedNodeStreamer struct {
Client core.RpcClient
}
// NewSeedNodeStreamer creates a pointer to a new SeedNodeStreamer which satisfies the ISeedNodeStreamer interface
func NewSeedNodeStreamer(client core.RpcClient) *SeedNodeStreamer {
return &SeedNodeStreamer{
Client: client,
}
}
// Stream is the main loop for subscribing to data from a vulcanizedb seed node
func (sds *SeedNodeStreamer) Stream(payloadChan chan SeedNodePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) {
return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters)
}
// Payload holds the data returned from the seed node to the requesting client
type SeedNodePayload struct {
BlockNumber *big.Int `json:"blockNumber"`
HeadersRlp [][]byte `json:"headersRlp"`
UnclesRlp [][]byte `json:"unclesRlp"`
TransactionsRlp [][]byte `json:"transactionsRlp"`
ReceiptsRlp [][]byte `json:"receiptsRlp"`
StateNodesRlp map[common.Hash][]byte `json:"stateNodesRlp"`
StorageNodesRlp map[common.Hash]map[common.Hash][]byte `json:"storageNodesRlp"`
ErrMsg string `json:"errMsg"`
encoded []byte
err error
}
func (sd *SeedNodePayload) ensureEncoded() {
if sd.encoded == nil && sd.err == nil {
sd.encoded, sd.err = json.Marshal(sd)
}
}
// Length to implement Encoder interface for StateDiff
func (sd *SeedNodePayload) Length() int {
sd.ensureEncoded()
return len(sd.encoded)
}
// Encode to implement Encoder interface for StateDiff
func (sd *SeedNodePayload) Encode() ([]byte, error) {
sd.ensureEncoded()
return sd.encoded, sd.err
}

View File

@ -1,48 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Streamer is used by watchers to stream eth data from a vulcanizedb seed node
package streamer
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
)
// IStreamer is the interface for streaming data from a vulcanizeDB seed node
type IStreamer interface {
Stream(payloadChan chan ipfs.ResponsePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error)
}
// Streamer is the underlying struct for the IStreamer interface
type Streamer struct {
Client core.RpcClient
}
// NewSeedStreamer creates a pointer to a new Streamer which satisfies the IStreamer interface
func NewSeedStreamer(client core.RpcClient) *Streamer {
return &Streamer{
Client: client,
}
}
// Stream is the main loop for subscribing to data from a vulcanizedb seed node
func (sds *Streamer) Stream(payloadChan chan ipfs.ResponsePayload, streamFilters config.Subscription) (*rpc.ClientSubscription, error) {
return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters)
}

View File

@ -49,6 +49,9 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) {
// Unpack block rlp to access fields
block := new(types.Block)
err := rlp.DecodeBytes(payload.BlockRlp, block)
if err != nil {
return nil, err
}
header := block.Header()
headerRlp, err := rlp.EncodeToBytes(header)
if err != nil {
@ -74,7 +77,7 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) {
}
txMeta := &TrxMetaData{
Dst: handleNullAddr(trx.To()),
Src: from.Hex(),
Src: handleNullAddr(&from),
}
// txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody
convertedPayload.TrxMetaData = append(convertedPayload.TrxMetaData, txMeta)

View File

@ -1,37 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
)
var _ = Describe("Converter", func() {
Describe("Convert", func() {
It("Converts StatediffPayloads into IPLDPayloads", func() {
mockConverter := mocks.PayloadConverter{}
mockConverter.ReturnIPLDPayload = &mocks.MockIPLDPayload
ipldPayload, err := mockConverter.Convert(mocks.MockStatediffPayload)
Expect(err).ToNot(HaveOccurred())
Expect(ipldPayload).To(Equal(&mocks.MockIPLDPayload))
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStatediffPayload))
})
})
})

View File

@ -28,7 +28,7 @@ import (
// IPLDFetcher is an interface for fetching IPLDs
type IPLDFetcher interface {
FetchCIDs(cids CidWrapper) (*IpldWrapper, error)
FetchCIDs(cids CIDWrapper) (*IPLDWrapper, error)
}
// EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS
@ -47,11 +47,11 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) {
}, nil
}
// FetchCIDs is the exported method for fetching and returning all the cids passed in a CidWrapper
func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) {
// FetchCIDs is the exported method for fetching and returning all the cids passed in a CIDWrapper
func (f *EthIPLDFetcher) FetchCIDs(cids CIDWrapper) (*IPLDWrapper, error) {
log.Debug("fetching iplds")
blocks := &IpldWrapper{
blocks := &IPLDWrapper{
BlockNumber: cids.BlockNumber,
Headers: make([]blocks.Block, 0),
Uncles: make([]blocks.Block, 0),
@ -91,7 +91,7 @@ func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) {
// fetchHeaders fetches headers
// It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchHeaders(cids CidWrapper, blocks *IpldWrapper) error {
func (f *EthIPLDFetcher) fetchHeaders(cids CIDWrapper, blocks *IPLDWrapper) error {
log.Debug("fetching header iplds")
headerCids := make([]cid.Cid, 0, len(cids.Headers))
for _, c := range cids.Headers {
@ -110,7 +110,7 @@ func (f *EthIPLDFetcher) fetchHeaders(cids CidWrapper, blocks *IpldWrapper) erro
// fetchUncles fetches uncles
// It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchUncles(cids CidWrapper, blocks *IpldWrapper) error {
func (f *EthIPLDFetcher) fetchUncles(cids CIDWrapper, blocks *IPLDWrapper) error {
log.Debug("fetching uncle iplds")
uncleCids := make([]cid.Cid, 0, len(cids.Uncles))
for _, c := range cids.Uncles {
@ -129,7 +129,7 @@ func (f *EthIPLDFetcher) fetchUncles(cids CidWrapper, blocks *IpldWrapper) error
// fetchTrxs fetches transactions
// It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchTrxs(cids CidWrapper, blocks *IpldWrapper) error {
func (f *EthIPLDFetcher) fetchTrxs(cids CIDWrapper, blocks *IPLDWrapper) error {
log.Debug("fetching transaction iplds")
trxCids := make([]cid.Cid, 0, len(cids.Transactions))
for _, c := range cids.Transactions {
@ -148,7 +148,7 @@ func (f *EthIPLDFetcher) fetchTrxs(cids CidWrapper, blocks *IpldWrapper) error {
// fetchRcts fetches receipts
// It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchRcts(cids CidWrapper, blocks *IpldWrapper) error {
func (f *EthIPLDFetcher) fetchRcts(cids CIDWrapper, blocks *IPLDWrapper) error {
log.Debug("fetching receipt iplds")
rctCids := make([]cid.Cid, 0, len(cids.Receipts))
for _, c := range cids.Receipts {
@ -168,7 +168,7 @@ func (f *EthIPLDFetcher) fetchRcts(cids CidWrapper, blocks *IpldWrapper) error {
// fetchState fetches state nodes
// It uses the single f.fetch method instead of the batch fetch, because it
// needs to maintain the data's relation to state keys
func (f *EthIPLDFetcher) fetchState(cids CidWrapper, blocks *IpldWrapper) error {
func (f *EthIPLDFetcher) fetchState(cids CIDWrapper, blocks *IPLDWrapper) error {
log.Debug("fetching state iplds")
for _, stateNode := range cids.StateNodes {
if stateNode.CID == "" || stateNode.Key == "" {
@ -190,7 +190,7 @@ func (f *EthIPLDFetcher) fetchState(cids CidWrapper, blocks *IpldWrapper) error
// fetchStorage fetches storage nodes
// It uses the single f.fetch method instead of the batch fetch, because it
// needs to maintain the data's relation to state and storage keys
func (f *EthIPLDFetcher) fetchStorage(cids CidWrapper, blocks *IpldWrapper) error {
func (f *EthIPLDFetcher) fetchStorage(cids CIDWrapper, blocks *IPLDWrapper) error {
log.Debug("fetching storage iplds")
for _, storageNode := range cids.StorageNodes {
if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" {

View File

@ -55,8 +55,9 @@ func HexToKey(hex string) common.Hash {
return crypto.Keccak256Hash(addr[:])
}
func emptyCidWrapper(cids CidWrapper) bool {
if len(cids.Transactions) > 0 || len(cids.Headers) > 0 || len(cids.Uncles) > 0 || len(cids.Receipts) > 0 || len(cids.StateNodes) > 0 || len(cids.StorageNodes) > 0 || cids.BlockNumber == nil {
// EmptyCIDWrapper returns whether or not the provided CIDWrapper has any Cids we need to process
func EmptyCIDWrapper(cids CIDWrapper) bool {
if len(cids.Transactions) > 0 || len(cids.Headers) > 0 || len(cids.Uncles) > 0 || len(cids.Receipts) > 0 || len(cids.StateNodes) > 0 || len(cids.StorageNodes) > 0 {
return false
}
return true

View File

@ -1 +0,0 @@
package mocks

View File

@ -1 +0,0 @@
package mocks

View File

@ -1,43 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
)
// StateDiffStreamer is the underlying struct for the Streamer interface
type StateDiffStreamer struct {
PassedPayloadChan chan statediff.Payload
ReturnSub *rpc.ClientSubscription
ReturnErr error
StreamPayloads []statediff.Payload
}
// Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
sds.PassedPayloadChan = payloadChan
go func() {
for _, payload := range sds.StreamPayloads {
sds.PassedPayloadChan <- payload
}
}()
return sds.ReturnSub, sds.ReturnErr
}

View File

@ -21,7 +21,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
rlp2 "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/vulcanize/eth-block-extractor/pkg/ipfs"
@ -30,7 +30,7 @@ import (
"github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_block_transactions"
"github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_state_trie"
"github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_storage_trie"
"github.com/vulcanize/eth-block-extractor/pkg/wrappers/rlp"
rlp2 "github.com/vulcanize/eth-block-extractor/pkg/wrappers/rlp"
)
// IPLDPublisher is the interface for publishing an IPLD payload
@ -66,7 +66,7 @@ func NewIPLDPublisher(ipfsPath string) (*Publisher, error) {
return nil, err
}
return &Publisher{
HeaderPutter: eth_block_header.NewBlockHeaderDagPutter(node, rlp.RlpDecoder{}),
HeaderPutter: eth_block_header.NewBlockHeaderDagPutter(node, rlp2.RlpDecoder{}),
TransactionPutter: eth_block_transactions.NewBlockTransactionsDagPutter(node),
ReceiptPutter: eth_block_receipts.NewEthBlockReceiptDagPutter(node),
StatePutter: eth_state_trie.NewStateTrieDagPutter(node),
@ -85,7 +85,7 @@ func (pub *Publisher) Publish(payload *IPLDPayload) (*CIDPayload, error) {
// Process and publish uncles
uncleCids := make(map[common.Hash]string)
for _, uncle := range payload.BlockBody.Uncles {
uncleRlp, err := rlp2.EncodeToBytes(uncle)
uncleRlp, err := rlp.EncodeToBytes(uncle)
if err != nil {
return nil, err
}

View File

@ -1,37 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
)
var _ = Describe("Publisher", func() {
Describe("Publish", func() {
It("Publishes IPLDPayload to IPFS", func() {
mockPublisher := mocks.IPLDPublisher{}
mockPublisher.ReturnCIDPayload = &mocks.MockCIDPayload
cidPayload, err := mockPublisher.Publish(&mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(cidPayload).To(Equal(&mocks.MockCIDPayload))
Expect(mockPublisher.PassedIPLDPayload).To(Equal(&mocks.MockIPLDPayload))
})
})
})

View File

@ -1,35 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
)
var _ = Describe("Repository", func() {
Describe("Index", func() {
It("Indexes CIDs against their metadata", func() {
mockRepo := mocks.CIDRepository{}
err := mockRepo.Index(&mocks.MockCIDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(mockRepo.PassedCIDPayload).To(Equal(&mocks.MockCIDPayload))
})
})
})

View File

@ -19,11 +19,12 @@ package ipfs
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-block-format"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
)
// IPLDResolver is the interface to resolving IPLDs
type IPLDResolver interface {
ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error)
ResolveIPLDs(ipfsBlocks IPLDWrapper) (*streamer.SeedNodePayload, error)
}
// EthIPLDResolver is the underlying struct to support the IPLDResolver interface
@ -35,8 +36,8 @@ func NewIPLDResolver() *EthIPLDResolver {
}
// ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper
func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) {
response := new(ResponsePayload)
func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) (*streamer.SeedNodePayload, error) {
response := new(streamer.SeedNodePayload)
response.BlockNumber = ipfsBlocks.BlockNumber
eir.resolveHeaders(ipfsBlocks.Headers, response)
eir.resolveUncles(ipfsBlocks.Uncles, response)
@ -47,35 +48,35 @@ func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePaylo
return response, nil
}
func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *ResponsePayload) {
func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SeedNodePayload) {
for _, block := range blocks {
raw := block.RawData()
response.HeadersRlp = append(response.HeadersRlp, raw)
}
}
func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *ResponsePayload) {
func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SeedNodePayload) {
for _, block := range blocks {
raw := block.RawData()
response.UnclesRlp = append(response.UnclesRlp, raw)
}
}
func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *ResponsePayload) {
func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SeedNodePayload) {
for _, block := range blocks {
raw := block.RawData()
response.TransactionsRlp = append(response.TransactionsRlp, raw)
}
}
func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *ResponsePayload) {
func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SeedNodePayload) {
for _, block := range blocks {
raw := block.RawData()
response.ReceiptsRlp = append(response.ReceiptsRlp, raw)
}
}
func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *ResponsePayload) {
func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) {
if response.StateNodesRlp == nil {
response.StateNodesRlp = make(map[common.Hash][]byte)
}
@ -85,7 +86,7 @@ func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, re
}
}
func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *ResponsePayload) {
func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SeedNodePayload) {
if response.StateNodesRlp == nil {
response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte)
}

View File

@ -1 +0,0 @@
package ipfs

View File

@ -1 +0,0 @@
package ipfs

View File

@ -1,46 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/core"
)
// StateDiffStreamer is the interface for streaming a statediff subscription
type StateDiffStreamer interface {
Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error)
}
// Streamer is the underlying struct for the StateDiffStreamer interface
type Streamer struct {
Client core.RpcClient
}
// NewStateDiffStreamer creates a pointer to a new Streamer which satisfies the StateDiffStreamer interface
func NewStateDiffStreamer(client core.RpcClient) *Streamer {
return &Streamer{
Client: client,
}
}
// Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *Streamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
return sds.Client.Subscribe("statediff", payloadChan, "stream")
}

View File

@ -1,45 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs_test
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
)
var _ = Describe("Streamer", func() {
Describe("Stream", func() {
It("Streams StatediffPayloads from a Geth RPC subscription", func() {
mockStreamer := mocks.StateDiffStreamer{}
mockStreamer.ReturnSub = &rpc.ClientSubscription{}
mockStreamer.StreamPayloads = []statediff.Payload{
mocks.MockStatediffPayload,
}
payloadChan := make(chan statediff.Payload, 1)
sub, err := mockStreamer.Stream(payloadChan)
Expect(err).ToNot(HaveOccurred())
Expect(sub).To(Equal(&rpc.ClientSubscription{}))
Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan))
streamedPayload := <-payloadChan
Expect(streamedPayload).To(Equal(mocks.MockStatediffPayload))
})
})
})

View File

@ -17,7 +17,6 @@
package ipfs
import (
"encoding/json"
"math/big"
"github.com/ethereum/go-ethereum/common"
@ -25,47 +24,8 @@ import (
"github.com/ipfs/go-block-format"
)
// Subscription holds the information for an individual client subscription
type Subscription struct {
PayloadChan chan<- ResponsePayload
QuitChan chan<- bool
}
// ResponsePayload holds the data returned from the seed node to the requesting client
type ResponsePayload struct {
BlockNumber *big.Int `json:"blockNumber"`
HeadersRlp [][]byte `json:"headersRlp"`
UnclesRlp [][]byte `json:"unclesRlp"`
TransactionsRlp [][]byte `json:"transactionsRlp"`
ReceiptsRlp [][]byte `json:"receiptsRlp"`
StateNodesRlp map[common.Hash][]byte `json:"stateNodesRlp"`
StorageNodesRlp map[common.Hash]map[common.Hash][]byte `json:"storageNodesRlp"`
ErrMsg string `json:"errMsg"`
encoded []byte
err error
}
func (sd *ResponsePayload) ensureEncoded() {
if sd.encoded == nil && sd.err == nil {
sd.encoded, sd.err = json.Marshal(sd)
}
}
// Length to implement Encoder interface for StateDiff
func (sd *ResponsePayload) Length() int {
sd.ensureEncoded()
return len(sd.encoded)
}
// Encode to implement Encoder interface for StateDiff
func (sd *ResponsePayload) Encode() ([]byte, error) {
sd.ensureEncoded()
return sd.encoded, sd.err
}
// CidWrapper is used to package CIDs retrieved from the local Postgres cache
type CidWrapper struct {
// CIDWrapper is used to package CIDs retrieved from the local Postgres cache
type CIDWrapper struct {
BlockNumber *big.Int
Headers []string
Uncles []string
@ -75,8 +35,8 @@ type CidWrapper struct {
StorageNodes []StorageNodeCID
}
// IpldWrapper is used to package raw IPLD block data for resolution
type IpldWrapper struct {
// IPLDWrapper is used to package raw IPLD block data for resolution
type IPLDWrapper struct {
BlockNumber *big.Int
Headers []blocks.Block
Uncles []blocks.Block

View File

@ -14,11 +14,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
package seed_node
import (
"context"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
@ -33,13 +35,13 @@ const APIVersion = "0.0.1"
// PublicSeedNodeAPI is the public api for the seed node
type PublicSeedNodeAPI struct {
snp SyncPublishScreenAndServe
snp Processor
}
// NewPublicSeedNodeAPI creates a new PublicSeedNodeAPI with the provided underlying SyncPublishScreenAndServe process
func NewPublicSeedNodeAPI(snp SyncPublishScreenAndServe) *PublicSeedNodeAPI {
func NewPublicSeedNodeAPI(seedNodeProcessor Processor) *PublicSeedNodeAPI {
return &PublicSeedNodeAPI{
snp: snp,
snp: seedNodeProcessor,
}
}
@ -56,7 +58,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S
go func() {
// subscribe to events from the SyncPublishScreenAndServe service
payloadChannel := make(chan ResponsePayload, payloadChanBufferSize)
payloadChannel := make(chan streamer.SeedNodePayload, payloadChanBufferSize)
quitChan := make(chan bool, 1)
go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters)

View File

@ -14,11 +14,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
package seed_node
import (
"bytes"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
@ -26,22 +29,22 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/config"
)
// ResponseScreener is the inteface used to screen eth data and package appropriate data into a response payload
type ResponseScreener interface {
ScreenResponse(streamFilters config.Subscription, payload IPLDPayload) (*ResponsePayload, error)
// ResponseFilterer is the inteface used to screen eth data and package appropriate data into a response payload
type ResponseFilterer interface {
FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (*streamer.SeedNodePayload, error)
}
// Screener is the underlying struct for the ReponseScreener interface
type Screener struct{}
// Filterer is the underlying struct for the ReponseFilterer interface
type Filterer struct{}
// NewResponseScreener creates a new Screener satisfyign the ReponseScreener interface
func NewResponseScreener() *Screener {
return &Screener{}
// NewResponseFilterer creates a new Filterer satisfyign the ReponseFilterer interface
func NewResponseFilterer() *Filterer {
return &Filterer{}
}
// ScreenResponse is used to filter through eth data to extract and package requested data into a ResponsePayload
func (s *Screener) ScreenResponse(streamFilters config.Subscription, payload IPLDPayload) (*ResponsePayload, error) {
response := new(ResponsePayload)
// FilterResponse is used to filter through eth data to extract and package requested data into a Payload
func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (*streamer.SeedNodePayload, error) {
response := new(streamer.SeedNodePayload)
err := s.filterHeaders(streamFilters, response, payload)
if err != nil {
return nil, err
@ -66,7 +69,7 @@ func (s *Screener) ScreenResponse(streamFilters config.Subscription, payload IPL
return response, nil
}
func (s *Screener) filterHeaders(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error {
func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP)
if !streamFilters.HeaderFilter.FinalOnly {
@ -89,7 +92,7 @@ func checkRange(start, end, actual int64) bool {
return false
}
func (s *Screener) filterTransactions(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) {
func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) {
trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions))
if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
for i, trx := range payload.BlockBody.Transactions {
@ -125,7 +128,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin
return false
}
func (s *Screener) filerReceipts(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error {
func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error {
if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
for i, receipt := range payload.Receipts {
if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) {
@ -181,12 +184,12 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac
return false
}
func (s *Screener) filterState(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error {
func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.StateNodesRlp = make(map[common.Hash][]byte)
keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses))
for _, addr := range streamFilters.StateFilter.Addresses {
keyFilter := AddressToKey(common.HexToAddress(addr))
keyFilter := ipfs.AddressToKey(common.HexToAddress(addr))
keyFilters = append(keyFilters, keyFilter)
}
for key, stateNode := range payload.StateNodes {
@ -213,17 +216,17 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
return false
}
func (s *Screener) filterStorage(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error {
func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SeedNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte)
stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses))
for _, addr := range streamFilters.StorageFilter.Addresses {
keyFilter := AddressToKey(common.HexToAddress(addr))
keyFilter := ipfs.AddressToKey(common.HexToAddress(addr))
stateKeyFilters = append(stateKeyFilters, keyFilter)
}
storageKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.StorageKeys))
for _, store := range streamFilters.StorageFilter.StorageKeys {
keyFilter := HexToKey(store)
keyFilter := ipfs.HexToKey(store)
storageKeyFilters = append(storageKeyFilters, keyFilter)
}
for stateKey, storageNodes := range payload.StorageNodes {

View File

@ -14,18 +14,19 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
package seed_node
import (
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
// CIDRepository is an interface for indexing CIDPayloads
// CIDRepository is an interface for indexing ipfs.CIDPayloads
type CIDRepository interface {
Index(cidPayload *CIDPayload) error
Index(cidPayload *ipfs.CIDPayload) error
}
// Repository is the underlying struct for the CIDRepository interface
@ -41,7 +42,7 @@ func NewCIDRepository(db *postgres.DB) *Repository {
}
// Index indexes a cidPayload in Postgres
func (repo *Repository) Index(cidPayload *CIDPayload) error {
func (repo *Repository) Index(cidPayload *ipfs.CIDPayload) error {
tx, err := repo.db.Beginx()
if err != nil {
return err
@ -87,7 +88,7 @@ func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string
return err
}
func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error {
func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error {
for hash, trxCidMeta := range payload.TransactionCIDs {
var txID int64
err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5)
@ -108,13 +109,13 @@ func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CID
return nil
}
func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, txID int64) error {
func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ipfs.ReceiptMetaData, txID int64) error {
_, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid, contract, topic0s) VALUES ($1, $2, $3, $4)`,
txID, cidMeta.CID, cidMeta.ContractAddress, pq.Array(cidMeta.Topic0s))
return err
}
func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error {
func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error {
for accountKey, stateCID := range payload.StateNodeCIDs {
var stateID int64
err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4)
@ -134,7 +135,7 @@ func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayloa
return nil
}
func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID, stateID int64) error {
func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID ipfs.StorageNodeCID, stateID int64) error {
_, err := tx.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid, leaf) VALUES ($1, $2, $3, $4)
ON CONFLICT (state_id, storage_key) DO UPDATE SET (cid, leaf) = ($3, $4)`,
stateID, storageCID.Key, storageCID.CID, storageCID.Leaf)

View File

@ -14,11 +14,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
package seed_node
import (
"math/big"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
@ -29,7 +31,7 @@ import (
// CIDRetriever is the interface for retrieving CIDs from the Postgres cache
type CIDRetriever interface {
RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*CidWrapper, error)
RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error)
RetrieveLastBlockNumber() (int64, error)
RetrieveFirstBlockNumber() (int64, error)
}
@ -60,7 +62,7 @@ func (ecr *EthCIDRetriever) RetrieveLastBlockNumber() (int64, error) {
}
// RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*CidWrapper, error) {
func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) {
log.Debug("retrieving cids")
var err error
tx, err := ecr.db.Beginx()
@ -69,7 +71,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, bloc
}
// THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS
// WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO
cw := new(CidWrapper)
cw := new(ipfs.CIDWrapper)
cw.BlockNumber = big.NewInt(blockNumber)
// Retrieve cached header CIDs
@ -212,7 +214,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su
return receiptCids, err
}
func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]StateNodeCID, error) {
func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StateNodeCID, error) {
log.Debug("retrieving state cids for block ", blockNumber)
args := make([]interface{}, 0, 2)
pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id)
@ -222,7 +224,7 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.
if addrLen > 0 {
keys := make([]string, 0, addrLen)
for _, addr := range streamFilters.StateFilter.Addresses {
keys = append(keys, HexToKey(addr).Hex())
keys = append(keys, ipfs.HexToKey(addr).Hex())
}
pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])`
args = append(args, pq.Array(keys))
@ -230,12 +232,12 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.
if !streamFilters.StorageFilter.IntermediateNodes {
pgStr += ` AND state_cids.leaf = TRUE`
}
stateNodeCIDs := make([]StateNodeCID, 0)
stateNodeCIDs := make([]ipfs.StateNodeCID, 0)
err := tx.Select(&stateNodeCIDs, pgStr, args...)
return stateNodeCIDs, err
}
func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]StorageNodeCID, error) {
func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StorageNodeCID, error) {
log.Debug("retrieving storage cids for block ", blockNumber)
args := make([]interface{}, 0, 3)
pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids
@ -247,7 +249,7 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi
if addrLen > 0 {
keys := make([]string, 0, addrLen)
for _, addr := range streamFilters.StorageFilter.Addresses {
keys = append(keys, HexToKey(addr).Hex())
keys = append(keys, ipfs.HexToKey(addr).Hex())
}
pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])`
args = append(args, pq.Array(keys))
@ -259,7 +261,7 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi
if !streamFilters.StorageFilter.IntermediateNodes {
pgStr += ` AND storage_cids.leaf = TRUE`
}
storageNodeCIDs := make([]StorageNodeCID, 0)
storageNodeCIDs := make([]ipfs.StorageNodeCID, 0)
err := tx.Select(&storageNodeCIDs, pgStr, args...)
return storageNodeCIDs, err
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs_test
package seed_node_test
import (
"io/ioutil"
@ -26,9 +26,9 @@ import (
. "github.com/onsi/gomega"
)
func TestIPFS(t *testing.T) {
func TestSeedNode(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "IPFS Suite Test")
RunSpecs(t, "Seed Node Suite Test")
}
var _ = BeforeSuite(func() {

View File

@ -14,11 +14,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
package seed_node
import (
"sync"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node"
@ -36,18 +39,18 @@ import (
const payloadChanBufferSize = 20000 // the max eth sub buffer size
const workerPoolSize = 1
// SyncPublishScreenAndServe is the top level interface for streaming, converting to IPLDs, publishing,
// Processor is the top level interface for streaming, converting to IPLDs, publishing,
// and indexing all Ethereum data; screening this data; and serving it up to subscribed clients
// This service is compatible with the Ethereum service interface (node.Service)
type SyncPublishScreenAndServe interface {
type Processor interface {
// APIs(), Protocols(), Start() and Stop()
node.Service
// Main event loop for syncAndPublish processes
SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error
SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- ipfs.IPLDPayload, forwardQuitchan chan<- bool) error
// Main event loop for handling client pub-sub
ScreenAndServe(screenAndServePayload <-chan IPLDPayload, screenAndServeQuit <-chan bool)
ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool)
// Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription)
Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription)
// Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID)
}
@ -57,21 +60,21 @@ type Service struct {
// Used to sync access to the Subscriptions
sync.Mutex
// Interface for streaming statediff payloads over a geth rpc subscription
Streamer StateDiffStreamer
Streamer streamer.IStateDiffStreamer
// Interface for converting statediff payloads into ETH-IPLD object payloads
Converter PayloadConverter
Converter ipfs.PayloadConverter
// Interface for publishing the ETH-IPLD payloads to IPFS
Publisher IPLDPublisher
Publisher ipfs.IPLDPublisher
// Interface for indexing the CIDs of the published ETH-IPLDs in Postgres
Repository CIDRepository
// Interface for filtering and serving data according to subscribed clients according to their specification
Screener ResponseScreener
Filterer ResponseFilterer
// Interface for fetching ETH-IPLD objects from IPFS
Fetcher IPLDFetcher
Fetcher ipfs.IPLDFetcher
// Interface for searching and retrieving CIDs from Postgres index
Retriever CIDRetriever
// Interface for resolving ipfs blocks to their data types
Resolver IPLDResolver
Resolver ipfs.IPLDResolver
// Chan the processor uses to subscribe to state diff payloads from the Streamer
PayloadChan chan statediff.Payload
// Used to signal shutdown of the service
@ -82,25 +85,25 @@ type Service struct {
SubscriptionTypes map[common.Hash]config.Subscription
}
// NewIPFSProcessor creates a new Processor interface using an underlying Processor struct
func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncPublishScreenAndServe, error) {
publisher, err := NewIPLDPublisher(ipfsPath)
// NewProcessor creates a new Processor interface using an underlying Service struct
func NewProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (Processor, error) {
publisher, err := ipfs.NewIPLDPublisher(ipfsPath)
if err != nil {
return nil, err
}
fetcher, err := NewIPLDFetcher(ipfsPath)
fetcher, err := ipfs.NewIPLDFetcher(ipfsPath)
if err != nil {
return nil, err
}
return &Service{
Streamer: NewStateDiffStreamer(rpcClient),
Streamer: streamer.NewStateDiffStreamer(rpcClient),
Repository: NewCIDRepository(db),
Converter: NewPayloadConverter(ethClient),
Converter: ipfs.NewPayloadConverter(ethClient),
Publisher: publisher,
Screener: NewResponseScreener(),
Filterer: NewResponseFilterer(),
Fetcher: fetcher,
Retriever: NewCIDRetriever(db),
Resolver: NewIPLDResolver(),
Resolver: ipfs.NewIPLDResolver(),
PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
QuitChan: qc,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
@ -128,7 +131,7 @@ func (sap *Service) APIs() []rpc.API {
// SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids
// This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop
// which filters and sends relevant data to client subscriptions, if there are any
func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- IPLDPayload, screenAndServeQuit chan<- bool) error {
func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- ipfs.IPLDPayload, screenAndServeQuit chan<- bool) error {
sub, err := sap.Streamer.Stream(sap.PayloadChan)
if err != nil {
return err
@ -136,7 +139,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha
wg.Add(1)
// Channels for forwarding data to the publishAndIndex workers
publishAndIndexPayload := make(chan IPLDPayload, payloadChanBufferSize)
publishAndIndexPayload := make(chan ipfs.IPLDPayload, payloadChanBufferSize)
publishAndIndexQuit := make(chan bool, workerPoolSize)
// publishAndIndex worker pool to handle publishing and indexing concurrently, while
// limiting the number of Postgres connections we can possibly open so as to prevent error
@ -192,7 +195,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha
return nil
}
func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan IPLDPayload, publishAndIndexQuit <-chan bool) {
func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan ipfs.IPLDPayload, publishAndIndexQuit <-chan bool) {
go func() {
for {
select {
@ -216,7 +219,7 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan IPLDPa
// ScreenAndServe is the loop used to screen data streamed from the state diffing eth node
// and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration
func (sap *Service) ScreenAndServe(screenAndServePayload <-chan IPLDPayload, screenAndServeQuit <-chan bool) {
func (sap *Service) ScreenAndServe(screenAndServePayload <-chan ipfs.IPLDPayload, screenAndServeQuit <-chan bool) {
go func() {
for {
select {
@ -233,7 +236,7 @@ func (sap *Service) ScreenAndServe(screenAndServePayload <-chan IPLDPayload, scr
}()
}
func (sap *Service) sendResponse(payload IPLDPayload) error {
func (sap *Service) sendResponse(payload ipfs.IPLDPayload) error {
sap.Lock()
for ty, subs := range sap.Subscriptions {
// Retrieve the subscription parameters for this subscription type
@ -242,7 +245,7 @@ func (sap *Service) sendResponse(payload IPLDPayload) error {
log.Errorf("subscription configuration for subscription type %s not available", ty.Hex())
continue
}
response, err := sap.Screener.ScreenResponse(subConfig, payload)
response, err := sap.Filterer.FilterResponse(subConfig, payload)
if err != nil {
log.Error(err)
continue
@ -261,7 +264,7 @@ func (sap *Service) sendResponse(payload IPLDPayload) error {
}
// Subscribe is used by the API to subscribe to the service loop
func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription) {
func (sap *Service) Subscribe(id rpc.ID, sub chan<- streamer.SeedNodePayload, quitChan chan<- bool, streamFilters config.Subscription) {
log.Info("Subscribing to the seed node service")
// Subscription type is defined as the hash of its content
// Group subscriptions by type and screen payloads once for subs of the same type
@ -299,7 +302,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
var err error
startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber()
if err != nil {
sub.PayloadChan <- ResponsePayload{
sub.PayloadChan <- streamer.SeedNodePayload{
ErrMsg: "unable to set block range; error: " + err.Error(),
}
}
@ -309,7 +312,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
if con.EndingBlock.Int64() <= 0 || con.EndingBlock.Int64() <= startingBlock {
endingBlock, err = sap.Retriever.RetrieveLastBlockNumber()
if err != nil {
sub.PayloadChan <- ResponsePayload{
sub.PayloadChan <- streamer.SeedNodePayload{
ErrMsg: "unable to set block range; error: " + err.Error(),
}
}
@ -323,18 +326,18 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
for i := con.StartingBlock.Int64(); i <= endingBlock; i++ {
cidWrapper, err := sap.Retriever.RetrieveCIDs(con, i)
if err != nil {
sub.PayloadChan <- ResponsePayload{
sub.PayloadChan <- streamer.SeedNodePayload{
ErrMsg: "CID retrieval error: " + err.Error(),
}
continue
}
if emptyCidWrapper(*cidWrapper) {
if ipfs.EmptyCIDWrapper(*cidWrapper) {
continue
}
blocksWrapper, err := sap.Fetcher.FetchCIDs(*cidWrapper)
if err != nil {
log.Error(err)
sub.PayloadChan <- ResponsePayload{
sub.PayloadChan <- streamer.SeedNodePayload{
ErrMsg: "IPLD fetching error: " + err.Error(),
}
continue
@ -342,7 +345,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper)
if err != nil {
log.Error(err)
sub.PayloadChan <- ResponsePayload{
sub.PayloadChan <- streamer.SeedNodePayload{
ErrMsg: "IPLD resolving error: " + err.Error(),
}
continue
@ -376,7 +379,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) {
func (sap *Service) Start(*p2p.Server) error {
log.Info("Starting seed node service")
wg := new(sync.WaitGroup)
payloadChan := make(chan IPLDPayload, payloadChanBufferSize)
payloadChan := make(chan ipfs.IPLDPayload, payloadChanBufferSize)
quitChan := make(chan bool, 1)
if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil {
return err

View File

@ -14,19 +14,22 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs_test
package seed_node_test
import (
"sync"
"time"
"github.com/vulcanize/vulcanizedb/pkg/seed_node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
mocks2 "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
mocks3 "github.com/vulcanize/vulcanizedb/pkg/seed_node/mocks"
)
var _ = Describe("Service", func() {
@ -36,14 +39,14 @@ var _ = Describe("Service", func() {
wg := new(sync.WaitGroup)
payloadChan := make(chan statediff.Payload, 1)
quitChan := make(chan bool, 1)
mockCidRepo := &mocks.CIDRepository{
mockCidRepo := &mocks3.CIDRepository{
ReturnErr: nil,
}
mockPublisher := &mocks.IPLDPublisher{
ReturnCIDPayload: &mocks.MockCIDPayload,
ReturnErr: nil,
}
mockStreamer := &mocks.StateDiffStreamer{
mockStreamer := &mocks2.StateDiffStreamer{
ReturnSub: &rpc.ClientSubscription{},
StreamPayloads: []statediff.Payload{
mocks.MockStatediffPayload,
@ -54,7 +57,7 @@ var _ = Describe("Service", func() {
ReturnIPLDPayload: &mocks.MockIPLDPayload,
ReturnErr: nil,
}
processor := &ipfs.Service{
processor := &seed_node.Service{
Repository: mockCidRepo,
Publisher: mockPublisher,
Streamer: mockStreamer,

View File

@ -14,4 +14,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
package seed_node
import (
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
)
// Subscription holds the information for an individual client subscription to the seed node
type Subscription struct {
PayloadChan chan<- streamer.SeedNodePayload
QuitChan chan<- bool
}