forked from cerc-io/ipld-eth-server
fix streamFilters issue
This commit is contained in:
parent
b5099a5051
commit
8ccdfd4835
77
cmd/test.go
77
cmd/test.go
@ -1,77 +0,0 @@
|
||||
// Copyright © 2019 Vulcanize, Inc
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/statediff"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
|
||||
)
|
||||
|
||||
// testCmd represents the test command
|
||||
var testCmd = &cobra.Command{
|
||||
Use: "test",
|
||||
Short: "A brief description of your command",
|
||||
Long: `A longer description that spans multiple lines and likely contains examples
|
||||
and usage of using your command. For example:
|
||||
|
||||
Cobra is a CLI library for Go that empowers applications.
|
||||
This application is a tool to generate the needed files
|
||||
to quickly create a Cobra application.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
test()
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(testCmd)
|
||||
}
|
||||
|
||||
func test() {
|
||||
_, _, rpcClient := getBlockChainAndClients()
|
||||
streamer := ipfs.NewStateDiffStreamer(rpcClient)
|
||||
payloadChan := make(chan statediff.Payload, 800)
|
||||
sub, err := streamer.Stream(payloadChan)
|
||||
if err != nil {
|
||||
println(err.Error())
|
||||
log.Fatal(err)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case payload := <-payloadChan:
|
||||
fmt.Printf("blockRlp: %v\r\nstateDiffRlp: %v\r\nerror: %v\r\n", payload.BlockRlp, payload.StateDiffRlp, payload.Err)
|
||||
var block types.Block
|
||||
err := rlp.DecodeBytes(payload.BlockRlp, &block)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
var stateDiff statediff.StateDiff
|
||||
err = rlp.DecodeBytes(payload.StateDiffRlp, &stateDiff)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Printf("block: %v\r\nstateDiff: %v\r\n", block, stateDiff)
|
||||
fmt.Printf("block number: %d\r\n", block.Number())
|
||||
case err = <-sub.Err():
|
||||
println(err.Error())
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
73
cmd/test2.go
73
cmd/test2.go
@ -16,10 +16,11 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/i-norden/go-ethereum/rlp"
|
||||
"bytes"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
@ -48,15 +49,18 @@ to quickly create a Cobra application.`,
|
||||
func test2() {
|
||||
rpcClient := getRpcClient()
|
||||
str := streamer.NewSeedStreamer(rpcClient)
|
||||
payloadChan := make(chan ipfs.ResponsePayload, 800)
|
||||
filter := ipfs.StreamFilters{}
|
||||
filter.HeaderFilter.FinalOnly = true
|
||||
filter.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
|
||||
filter.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
|
||||
filter.ReceiptFilter.Topic0s = []string{}
|
||||
filter.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
|
||||
filter.StorageFilter.Off = true
|
||||
sub, err := str.Stream(payloadChan, filter)
|
||||
payloadChan := make(chan ipfs.ResponsePayload, 8000)
|
||||
streamFilters := ipfs.StreamFilters{}
|
||||
streamFilters.HeaderFilter.FinalOnly = true
|
||||
streamFilters.ReceiptFilter.Topic0s = []string{
|
||||
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
|
||||
"0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377",
|
||||
}
|
||||
streamFilters.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
|
||||
streamFilters.StorageFilter.Off = true
|
||||
//streamFilters.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
|
||||
//streamFilters.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
|
||||
sub, err := str.Stream(payloadChan, streamFilters)
|
||||
if err != nil {
|
||||
println(err.Error())
|
||||
log.Fatal(err)
|
||||
@ -68,29 +72,52 @@ func test2() {
|
||||
log.Error(payload.Err)
|
||||
}
|
||||
for _, headerRlp := range payload.HeadersRlp {
|
||||
header := new(types.Header)
|
||||
err = rlp.DecodeBytes(headerRlp, header)
|
||||
var header types.Header
|
||||
err = rlp.Decode(bytes.NewBuffer(headerRlp), &header)
|
||||
if err != nil {
|
||||
println(err.Error())
|
||||
log.Error(err)
|
||||
}
|
||||
println("header")
|
||||
println(header.TxHash.Hex())
|
||||
println(header.Hash().Hex())
|
||||
println(header.Number.Int64())
|
||||
}
|
||||
for _, trxRlp := range payload.TransactionsRlp {
|
||||
trx := new(types.Transaction)
|
||||
err = rlp.DecodeBytes(trxRlp, trx)
|
||||
var trx types.Transaction
|
||||
buff := bytes.NewBuffer(trxRlp)
|
||||
stream := rlp.NewStream(buff, 0)
|
||||
err := trx.DecodeRLP(stream)
|
||||
if err != nil {
|
||||
println(err.Error())
|
||||
log.Error(err)
|
||||
}
|
||||
println("trx")
|
||||
println(trx.Hash().Hex())
|
||||
println(trx.Value().Int64())
|
||||
}
|
||||
for _, rctRlp := range payload.ReceiptsRlp {
|
||||
rct := new(types.Receipt)
|
||||
err = rlp.DecodeBytes(rctRlp, rct)
|
||||
var rct types.Receipt
|
||||
buff := bytes.NewBuffer(rctRlp)
|
||||
stream := rlp.NewStream(buff, 0)
|
||||
err = rct.DecodeRLP(stream)
|
||||
if err != nil {
|
||||
println(err.Error())
|
||||
log.Error(err)
|
||||
}
|
||||
println("rct")
|
||||
println(rct.TxHash.Hex())
|
||||
println(rct.BlockNumber.Bytes())
|
||||
for _, l := range rct.Logs {
|
||||
println("log")
|
||||
println(l.BlockHash.Hex())
|
||||
println(l.TxHash.Hex())
|
||||
println(l.Address.Hex())
|
||||
}
|
||||
}
|
||||
for _, stateRlp := range payload.StateNodesRlp {
|
||||
acct := new(state.Account)
|
||||
err = rlp.DecodeBytes(stateRlp, acct)
|
||||
var acct state.Account
|
||||
err = rlp.Decode(bytes.NewBuffer(stateRlp), &acct)
|
||||
if err != nil {
|
||||
println(err.Error())
|
||||
log.Error(err)
|
||||
}
|
||||
println("state")
|
||||
println(acct.Root.Hex())
|
||||
println(acct.Balance.Int64())
|
||||
|
@ -43,5 +43,5 @@ func NewSeedStreamer(client core.RpcClient) *Streamer {
|
||||
|
||||
// Stream is the main loop for subscribing to data from a vulcanizedb seed node
|
||||
func (sds *Streamer) Stream(payloadChan chan ipfs.ResponsePayload, streamFilters ipfs.StreamFilters) (*rpc.ClientSubscription, error) {
|
||||
return sds.Client.Subscribe("vulcanizedb", payloadChan, "subscribe")
|
||||
return sds.Client.Subscribe("vulcanizedb", payloadChan, "stream", streamFilters)
|
||||
}
|
||||
|
@ -29,5 +29,5 @@ type RPCClient interface {
|
||||
BatchCall(batch []client.BatchElem) error
|
||||
IpcPath() string
|
||||
SupportedModules() (map[string]string, error)
|
||||
Subscribe(namespace string, payloadChan interface{}, subName string, args ...interface{}) (*rpc.ClientSubscription, error)
|
||||
Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
|
||||
}
|
||||
|
@ -90,4 +90,4 @@ func (client RPCClient) Subscribe(namespace string, payloadChan interface{}, arg
|
||||
return nil, errors.New("channel given to Subscribe must not be nil")
|
||||
}
|
||||
return client.client.Subscribe(context.Background(), namespace, payloadChan, args...)
|
||||
}
|
||||
}
|
||||
|
@ -188,6 +188,6 @@ func (client *MockRPCClient) AssertBatchCalledWith(method string, lengthOfBatch
|
||||
Expect(client.passedMethod).To(Equal(method))
|
||||
}
|
||||
|
||||
func (client *MockRpcClient) Subscribe(namespace string, payloadChan interface{}, subName string, args ...interface{}) (*rpc.ClientSubscription, error) {
|
||||
func (client *MockRpcClient) Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package ipfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
)
|
||||
@ -40,29 +41,19 @@ func NewPublicSeedNodeAPI(snp SyncPublishScreenAndServe) *PublicSeedNodeAPI {
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created
|
||||
func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChanForTypeDefOnly chan ResponsePayload) (*rpc.Subscription, error) {
|
||||
// Stream is the public method to setup a subscription that fires off SyncPublishScreenAndServe payloads as they are created
|
||||
func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters StreamFilters) (*rpc.Subscription, error) {
|
||||
// ensure that the RPC connection supports subscriptions
|
||||
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||
if !supported {
|
||||
return nil, rpc.ErrNotificationsUnsupported
|
||||
}
|
||||
|
||||
streamFilters := StreamFilters{}
|
||||
streamFilters.HeaderFilter.FinalOnly = true
|
||||
streamFilters.TrxFilter.Src = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
|
||||
streamFilters.TrxFilter.Dst = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
|
||||
streamFilters.ReceiptFilter.Topic0s = []string{
|
||||
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
|
||||
"0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377",
|
||||
}
|
||||
streamFilters.StateFilter.Addresses = []string{"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"}
|
||||
streamFilters.StorageFilter.Off = true
|
||||
// create subscription and start waiting for statediff events
|
||||
rpcSub := notifier.CreateSubscription()
|
||||
|
||||
go func() {
|
||||
// subscribe to events from the state diff service
|
||||
// subscribe to events from the SyncPublishScreenAndServe service
|
||||
payloadChannel := make(chan ResponsePayload)
|
||||
quitChan := make(chan bool)
|
||||
go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, &streamFilters)
|
||||
@ -81,11 +72,11 @@ func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, payloadChanForTypeD
|
||||
}
|
||||
return
|
||||
case <-quitChan:
|
||||
// don't need to unsubscribe, statediff service does so before sending the quit signal
|
||||
// don't need to unsubscribe, SyncPublishScreenAndServe service does so before sending the quit signal
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return rpcSub, nil
|
||||
}
|
||||
}
|
||||
|
@ -27,9 +27,9 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// IPLDFethcer is an interface for fetching IPLDs
|
||||
// IPLDFetcher is an interface for fetching IPLDs
|
||||
type IPLDFetcher interface {
|
||||
FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error)
|
||||
FetchCIDs(cids CidWrapper) (*IpldWrapper, error)
|
||||
}
|
||||
|
||||
// EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS
|
||||
@ -48,9 +48,9 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// FetchCIDs is the exported method for fetching and returning all the cids passed in a cidWrapper
|
||||
func (f *EthIPLDFetcher) FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error) {
|
||||
blocks := &ipfsBlockWrapper{
|
||||
// FetchCIDs is the exported method for fetching and returning all the cids passed in a CidWrapper
|
||||
func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) {
|
||||
blocks := &IpldWrapper{
|
||||
Headers: make([]blocks.Block, 0),
|
||||
Transactions: make([]blocks.Block, 0),
|
||||
Receipts: make([]blocks.Block, 0),
|
||||
@ -84,7 +84,7 @@ func (f *EthIPLDFetcher) FetchCIDs(cids cidWrapper) (*ipfsBlockWrapper, error) {
|
||||
|
||||
// fetchHeaders fetches headers
|
||||
// It uses the f.fetchBatch method
|
||||
func (f *EthIPLDFetcher) fetchHeaders(cids cidWrapper, blocks *ipfsBlockWrapper) error {
|
||||
func (f *EthIPLDFetcher) fetchHeaders(cids CidWrapper, blocks *IpldWrapper) error {
|
||||
headerCids := make([]cid.Cid, 0, len(cids.Headers))
|
||||
for _, c := range cids.Headers {
|
||||
dc, err := cid.Decode(c)
|
||||
@ -102,7 +102,7 @@ func (f *EthIPLDFetcher) fetchHeaders(cids cidWrapper, blocks *ipfsBlockWrapper)
|
||||
|
||||
// fetchTrxs fetches transactions
|
||||
// It uses the f.fetchBatch method
|
||||
func (f *EthIPLDFetcher) fetchTrxs(cids cidWrapper, blocks *ipfsBlockWrapper) error {
|
||||
func (f *EthIPLDFetcher) fetchTrxs(cids CidWrapper, blocks *IpldWrapper) error {
|
||||
trxCids := make([]cid.Cid, 0, len(cids.Transactions))
|
||||
for _, c := range cids.Transactions {
|
||||
dc, err := cid.Decode(c)
|
||||
@ -120,7 +120,7 @@ func (f *EthIPLDFetcher) fetchTrxs(cids cidWrapper, blocks *ipfsBlockWrapper) er
|
||||
|
||||
// fetchRcts fetches receipts
|
||||
// It uses the f.fetchBatch method
|
||||
func (f *EthIPLDFetcher) fetchRcts(cids cidWrapper, blocks *ipfsBlockWrapper) error {
|
||||
func (f *EthIPLDFetcher) fetchRcts(cids CidWrapper, blocks *IpldWrapper) error {
|
||||
rctCids := make([]cid.Cid, 0, len(cids.Receipts))
|
||||
for _, c := range cids.Receipts {
|
||||
dc, err := cid.Decode(c)
|
||||
@ -139,7 +139,7 @@ func (f *EthIPLDFetcher) fetchRcts(cids cidWrapper, blocks *ipfsBlockWrapper) er
|
||||
// fetchState fetches state nodes
|
||||
// It uses the single f.fetch method instead of the batch fetch, because it
|
||||
// needs to maintain the data's relation to state keys
|
||||
func (f *EthIPLDFetcher) fetchState(cids cidWrapper, blocks *ipfsBlockWrapper) error {
|
||||
func (f *EthIPLDFetcher) fetchState(cids CidWrapper, blocks *IpldWrapper) error {
|
||||
for _, stateNode := range cids.StateNodes {
|
||||
if stateNode.CID == "" || stateNode.Key == "" {
|
||||
continue
|
||||
@ -160,7 +160,7 @@ func (f *EthIPLDFetcher) fetchState(cids cidWrapper, blocks *ipfsBlockWrapper) e
|
||||
// fetchStorage fetches storage nodes
|
||||
// It uses the single f.fetch method instead of the batch fetch, because it
|
||||
// needs to maintain the data's relation to state and storage keys
|
||||
func (f *EthIPLDFetcher) fetchStorage(cids cidWrapper, blocks *ipfsBlockWrapper) error {
|
||||
func (f *EthIPLDFetcher) fetchStorage(cids CidWrapper, blocks *IpldWrapper) error {
|
||||
for _, storageNode := range cids.StorageNodes {
|
||||
if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" {
|
||||
continue
|
||||
|
@ -122,7 +122,7 @@ func (pub *Publisher) Publish(payload *IPLDPayload) (*CIDPayload, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Package CIDs into a single struct
|
||||
// Package CIDs and their metadata into a single struct
|
||||
return &CIDPayload{
|
||||
BlockHash: payload.BlockHash,
|
||||
BlockNumber: payload.BlockNumber.String(),
|
||||
@ -147,6 +147,13 @@ func (pub *Publisher) publishHeaders(headerRLP []byte) (string, error) {
|
||||
}
|
||||
|
||||
func (pub *Publisher) publishTransactions(blockBody *types.Body, trxMeta []*TrxMetaData) (map[common.Hash]*TrxMetaData, error) {
|
||||
/*
|
||||
println("publishing transactions")
|
||||
for _, trx := range blockBody.Transactions {
|
||||
println("trx value:")
|
||||
println(trx.Value().Int64())
|
||||
}
|
||||
*/
|
||||
transactionCids, err := pub.TransactionPutter.DagPut(blockBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -22,17 +22,21 @@ import (
|
||||
"github.com/ipfs/go-block-format"
|
||||
)
|
||||
|
||||
// IPLDResolver is the interface to resolving IPLDs
|
||||
type IPLDResolver interface {
|
||||
ResolveIPLDs(ipfsBlocks ipfsBlockWrapper) (*ResponsePayload, error)
|
||||
ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error)
|
||||
}
|
||||
|
||||
// EthIPLDResolver is the underlying struct to support the IPLDResolver interface
|
||||
type EthIPLDResolver struct{}
|
||||
|
||||
// NewIPLDResolver returns a pointer to an EthIPLDResolver which satisfies the IPLDResolver interface
|
||||
func NewIPLDResolver() *EthIPLDResolver {
|
||||
return &EthIPLDResolver{}
|
||||
}
|
||||
|
||||
func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks ipfsBlockWrapper) (*ResponsePayload, error) {
|
||||
// ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper
|
||||
func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) {
|
||||
response := new(ResponsePayload)
|
||||
eir.resolveHeaders(ipfsBlocks.Headers, response)
|
||||
eir.resolveTransactions(ipfsBlocks.Transactions, response)
|
||||
|
@ -22,26 +22,32 @@ import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
)
|
||||
|
||||
// CIDRetriever is the interface for retrieving CIDs from the Postgres cache
|
||||
type CIDRetriever interface {
|
||||
RetrieveCIDs(streamFilters StreamFilters) ([]cidWrapper, error)
|
||||
RetrieveCIDs(streamFilters StreamFilters) ([]CidWrapper, error)
|
||||
}
|
||||
|
||||
// EthCIDRetriever is the underlying struct supporting the CIDRetriever interface
|
||||
type EthCIDRetriever struct {
|
||||
db *postgres.DB
|
||||
}
|
||||
|
||||
// NewCIDRetriever returns a pointer to a new EthCIDRetriever which supports the CIDRetriever interface
|
||||
func NewCIDRetriever(db *postgres.DB) *EthCIDRetriever {
|
||||
return &EthCIDRetriever{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
// GetLastBlockNumber is used to retrieve the latest block number in the cache
|
||||
func (ecr *EthCIDRetriever) GetLastBlockNumber() (int64, error) {
|
||||
var blockNumber int64
|
||||
err := ecr.db.Get(&blockNumber, "SELECT block_number FROM header_cids ORDER BY block_number DESC LIMIT 1 ")
|
||||
return blockNumber, err
|
||||
}
|
||||
func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]cidWrapper, error) {
|
||||
|
||||
// RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters
|
||||
func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]CidWrapper, error) {
|
||||
var endingBlock int64
|
||||
var err error
|
||||
if streamFilters.EndingBlock <= 0 || streamFilters.EndingBlock <= streamFilters.StartingBlock {
|
||||
@ -50,13 +56,13 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]cidWrap
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
cids := make([]cidWrapper, 0, endingBlock+1-streamFilters.StartingBlock)
|
||||
cids := make([]CidWrapper, 0, endingBlock+1-streamFilters.StartingBlock)
|
||||
tx, err := ecr.db.Beginx()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := streamFilters.StartingBlock; i <= endingBlock; i++ {
|
||||
cw := &cidWrapper{
|
||||
cw := &CidWrapper{
|
||||
BlockNumber: i,
|
||||
Headers: make([]string, 0),
|
||||
Transactions: make([]string, 0),
|
||||
@ -106,7 +112,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters StreamFilters) ([]cidWrap
|
||||
return cids, err
|
||||
}
|
||||
|
||||
func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error {
|
||||
func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error {
|
||||
var pgStr string
|
||||
if streamFilters.HeaderFilter.FinalOnly {
|
||||
pgStr = `SELECT cid FROM header_cids
|
||||
@ -119,10 +125,10 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters Stream
|
||||
return tx.Select(cids.Headers, pgStr, blockNumber)
|
||||
}
|
||||
|
||||
func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) ([]int64, error) {
|
||||
func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) ([]int64, error) {
|
||||
args := make([]interface{}, 0, 3)
|
||||
type result struct {
|
||||
Id int64 `db:"id"`
|
||||
ID int64 `db:"id"`
|
||||
Cid string `db:"cid"`
|
||||
}
|
||||
results := make([]result, 0)
|
||||
@ -144,12 +150,12 @@ func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters StreamFil
|
||||
ids := make([]int64, 0)
|
||||
for _, res := range results {
|
||||
cids.Transactions = append(cids.Transactions, res.Cid)
|
||||
ids = append(ids, res.Id)
|
||||
ids = append(ids, res.ID)
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64, trxIds []int64) error {
|
||||
func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64, trxIds []int64) error {
|
||||
args := make([]interface{}, 0, 2)
|
||||
pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids
|
||||
WHERE receipt_cids.tx_id = transaction_cids.id
|
||||
@ -169,7 +175,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters StreamFil
|
||||
return tx.Select(cids.Receipts, pgStr, args...)
|
||||
}
|
||||
|
||||
func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error {
|
||||
func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error {
|
||||
args := make([]interface{}, 0, 2)
|
||||
pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id)
|
||||
WHERE header_cids.block_number = $1`
|
||||
@ -186,7 +192,7 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters StreamF
|
||||
return tx.Select(cids.StateNodes, pgStr, args...)
|
||||
}
|
||||
|
||||
func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *cidWrapper, blockNumber int64) error {
|
||||
func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters StreamFilters, cids *CidWrapper, blockNumber int64) error {
|
||||
args := make([]interface{}, 0, 3)
|
||||
pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids
|
||||
WHERE storage_cids.state_id = state_cids.id
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||
)
|
||||
|
||||
const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size
|
||||
const payloadChanBufferSize = 8000 // the max eth sub buffer size
|
||||
|
||||
// SyncPublishScreenAndServe is an interface for streaming, converting to IPLDs, publishing,
|
||||
// indexing all Ethereum data screening this data, and serving it up to subscribed clients
|
||||
@ -118,7 +118,7 @@ func (sap *Service) APIs() []rpc.API {
|
||||
}
|
||||
|
||||
// SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids
|
||||
// It then forwards the data to the Serve() loop which filters and sends relevent data to client subscriptions
|
||||
// It then forwards the data to the Serve() loop which filters and sends relevant data to client subscriptions
|
||||
func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error {
|
||||
sub, err := sap.Streamer.Stream(sap.PayloadChan)
|
||||
if err != nil {
|
||||
@ -139,7 +139,6 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<-
|
||||
continue
|
||||
}
|
||||
// If we have a ScreenAndServe process running, forward the payload to it
|
||||
// If the ScreenAndServe process loop is slower than this one, will it miss some incoming payloads??
|
||||
select {
|
||||
case forwardPayloadChan <- *ipldPayload:
|
||||
default:
|
||||
@ -284,7 +283,7 @@ func (sap *Service) serve(id rpc.ID, payload ResponsePayload) {
|
||||
if ok {
|
||||
select {
|
||||
case sub.PayloadChan <- payload:
|
||||
log.Infof("sending state diff payload to subscription %s", id)
|
||||
log.Infof("sending seed node payload to subscription %s", id)
|
||||
default:
|
||||
log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
|
||||
}
|
||||
|
@ -42,5 +42,5 @@ func NewStateDiffStreamer(client core.RpcClient) *Streamer {
|
||||
|
||||
// Stream is the main loop for subscribing to data from the Geth state diff process
|
||||
func (sds *Streamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
|
||||
return sds.Client.Subscribe("statediff", payloadChan, "subscribe")
|
||||
return sds.Client.Subscribe("statediff", payloadChan, "stream")
|
||||
}
|
||||
|
@ -171,14 +171,14 @@ var (
|
||||
common.HexToHash("0x0"): {
|
||||
{
|
||||
CID: "mockStorageCID1",
|
||||
Key: common.HexToHash("0x0"),
|
||||
Key: "0x0",
|
||||
Leaf: true,
|
||||
},
|
||||
},
|
||||
common.HexToHash("0x1"): {
|
||||
{
|
||||
CID: "mockStorageCID2",
|
||||
Key: common.HexToHash("0x1"),
|
||||
Key: "0x1",
|
||||
Leaf: true,
|
||||
},
|
||||
},
|
||||
|
@ -1,17 +0,0 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2019 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package test_helpers
|
@ -65,7 +65,8 @@ func (sd *ResponsePayload) Encode() ([]byte, error) {
|
||||
return sd.encoded, sd.err
|
||||
}
|
||||
|
||||
type cidWrapper struct {
|
||||
// CidWrapper is used to package CIDs retrieved from the local Postgres cache
|
||||
type CidWrapper struct {
|
||||
BlockNumber int64
|
||||
Headers []string
|
||||
Transactions []string
|
||||
@ -74,7 +75,8 @@ type cidWrapper struct {
|
||||
StorageNodes []StorageNodeCID
|
||||
}
|
||||
|
||||
type ipfsBlockWrapper struct {
|
||||
// IpldWrapper is used to package raw IPLD block data for resolution
|
||||
type IpldWrapper struct {
|
||||
Headers []blocks.Block
|
||||
Transactions []blocks.Block
|
||||
Receipts []blocks.Block
|
||||
|
Loading…
Reference in New Issue
Block a user