From e7cdd6247eecd1f1f0fad4ccd0466a29ed8d2ba7 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 7 Jun 2019 11:01:29 -0500 Subject: [PATCH] update to use ReceiptForStorage; expose rpc server over ws --- cmd/streamSubscribe.go | 6 +- cmd/syncPublishScreenAndServe.go | 23 ++++++ environments/syncPublishScreenAndServe.toml | 52 +++++++++++++ pkg/ipfs/retreiver.go | 12 +-- pkg/ipfs/screener.go | 3 +- pkg/ipfs/service.go | 73 ++++++++++--------- .../pkg/ipfs/eth_block_receipts/dag_putter.go | 7 +- 7 files changed, 130 insertions(+), 46 deletions(-) create mode 100644 environments/syncPublishScreenAndServe.toml diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index 3b66c8de..87b3fe78 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -95,10 +95,10 @@ func streamSubscribe() { continue } fmt.Printf("Transaction with hash %s\n", trx.Hash().Hex()) - fmt.Printf("trx: %v", trx) + fmt.Printf("trx: %v\n", trx) } for _, rctRlp := range payload.ReceiptsRlp { - var rct types.Receipt + var rct types.ReceiptForStorage buff := bytes.NewBuffer(rctRlp) stream := rlp.NewStream(buff, 0) err = rct.DecodeRLP(stream) @@ -107,7 +107,7 @@ func streamSubscribe() { continue } fmt.Printf("Receipt with block hash %s, trx hash %s\n", rct.BlockHash.Hex(), rct.TxHash.Hex()) - fmt.Printf("rct: %v", rct) + fmt.Printf("rct: %v\n", rct) for _, l := range rct.Logs { if len(l.Topics) < 1 { log.Error(fmt.Sprintf("log only has %d topics", len(l.Topics))) diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 6512e8fa..91648745 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -16,11 +16,14 @@ package cmd import ( + "os" + "path/filepath" syn "sync" "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/utils" @@ -65,9 +68,29 @@ func syncPublishScreenAndServe() { log.Fatal(err) } processor.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) + + var ipcPath string + ipcPath = viper.GetString("server.ipcPath") + if ipcPath == "" { + home, err := os.UserHomeDir() + if err != nil { + log.Fatal(err) + } + ipcPath = filepath.Join(home, ".vulcanize/vulcanize.ipc") + } _, _, err = rpc.StartIPCEndpoint(vulcPath, processor.APIs()) if err != nil { log.Fatal(err) } + + var wsEndpoint string + wsEndpoint = viper.GetString("server.wsEndpoint") + if wsEndpoint == "" { + wsEndpoint = "127.0.0.1:2019" + } + _, _, err = rpc.StartWSEndpoint(wsEndpoint, processor.APIs(), []string{"vulcanizedb"}, nil, true) + if err != nil { + log.Fatal(err) + } wg.Wait() } diff --git a/environments/syncPublishScreenAndServe.toml b/environments/syncPublishScreenAndServe.toml new file mode 100644 index 00000000..d3e2159d --- /dev/null +++ b/environments/syncPublishScreenAndServe.toml @@ -0,0 +1,52 @@ +[database] + name = "vulcanize_demo" + hostname = "localhost" + port = 5432 + +[client] + ipcPath = "ws://127.0.0.1:8546" + +[server] + ipcPath = "/Users/iannorden/.vulcanize/vulcanize.ipc" + wsEndpoint = "127.0.0.1:2019" + +[subscription] + path = "ws://127.0.0.1:2019" + backfill = true + backfillOnly = false + startingBlock = 0 + endingBlock = 0 + [subscription.headerFilter] + off = false + finalOnly = true + [subscription.trxFilter] + off = false + src = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", + ] + dst = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", + ] + [subscription.receiptFilter] + off = false + topic0s = [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" + ] + [subscription.stateFilter] + off = false + addresses = [ + "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe" + ] + intermediateNodes = false + [subscription.storageFilter] + off = true + addresses = [ + "", + "" + ] + storageKeys = [ + "", + "" + ] + intermediateNodes = false \ No newline at end of file diff --git a/pkg/ipfs/retreiver.go b/pkg/ipfs/retreiver.go index 6bf8e583..ab1d3c7b 100644 --- a/pkg/ipfs/retreiver.go +++ b/pkg/ipfs/retreiver.go @@ -65,6 +65,8 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C if err != nil { return nil, err } + log.Debug("backfill starting block:", streamFilters.StartingBlock) + log.Debug("backfill ending block:", endingBlock) for i := streamFilters.StartingBlock; i <= endingBlock; i++ { cw := CidWrapper{} if !streamFilters.HeaderFilter.Off { @@ -115,7 +117,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C } func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, error) { - log.Debug("retrieving header cids") + log.Debug("retrieving header cids for block ", blockNumber) headers := make([]string, 0) pgStr := `SELECT cid FROM header_cids WHERE block_number = $1` @@ -127,7 +129,7 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config } func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, []int64, error) { - log.Debug("retrieving transaction cids") + log.Debug("retrieving transaction cids for block ", blockNumber) args := make([]interface{}, 0, 3) type result struct { ID int64 `db:"id"` @@ -159,7 +161,7 @@ func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Su } func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64, trxIds []int64) ([]string, error) { - log.Debug("retrieving receipt cids") + log.Debug("retrieving receipt cids for block ", blockNumber) args := make([]interface{}, 0, 2) pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids WHERE receipt_cids.tx_id = transaction_cids.id @@ -182,7 +184,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su } func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]StateNodeCID, error) { - log.Debug("retrieving state cids") + log.Debug("retrieving state cids for block ", blockNumber) args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.cid, state_cids.state_key FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) WHERE header_cids.block_number = $1` @@ -205,7 +207,7 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config. } func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]StorageNodeCID, error) { - log.Debug("retrieving storage cids") + log.Debug("retrieving storage cids for block ", blockNumber) args := make([]interface{}, 0, 3) pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key FROM storage_cids, state_cids, header_cids WHERE storage_cids.state_id = state_cids.id diff --git a/pkg/ipfs/screener.go b/pkg/ipfs/screener.go index 8305a84d..98ab9030 100644 --- a/pkg/ipfs/screener.go +++ b/pkg/ipfs/screener.go @@ -128,8 +128,9 @@ func (s *Screener) filerReceipts(streamFilters *config.Subscription, response *R if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { for i, receipt := range payload.Receipts { if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) { + receiptForStorage := (*types.ReceiptForStorage)(receipt) receiptBuffer := new(bytes.Buffer) - err := receipt.EncodeRLP(receiptBuffer) + err := receiptForStorage.EncodeRLP(receiptBuffer) if err != nil { return err } diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go index 54e9da67..4f0674f2 100644 --- a/pkg/ipfs/service.go +++ b/pkg/ipfs/service.go @@ -209,8 +209,7 @@ func (sap *Service) processResponse(payload IPLDPayload) error { // Subscribe is used by the API to subscribe to the service loop func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *config.Subscription) { log.Info("Subscribing to the seed node service") - sap.Lock() - sap.Subscriptions[id] = Subscription{ + subscription := Subscription{ PayloadChan: sub, QuitChan: quitChan, StreamFilters: streamFilters, @@ -218,40 +217,46 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan ch // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data // Otherwise we only filter new data as it is streamed in from the state diffing geth node if streamFilters.BackFill || streamFilters.BackFillOnly { - log.Debug("back-filling data for id", id) - // Retrieve cached CIDs relevant to this subscriber - cidWrappers, err := sap.Retriever.RetrieveCIDs(*streamFilters) - if err != nil { - log.Error(err) - sap.serve(id, ResponsePayload{ - ErrMsg: "CID retrieval error: " + err.Error(), - }) - return - } - for _, cidWrapper := range cidWrappers { - blocksWrapper, err := sap.Fetcher.FetchCIDs(cidWrapper) - if err != nil { - log.Error(err) - sap.serve(id, ResponsePayload{ - ErrMsg: "IPLD fetching error: " + err.Error(), - }) - return - } - backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper) - if err != nil { - log.Error(err) - sap.serve(id, ResponsePayload{ - ErrMsg: "IPLD resolving error: " + err.Error(), - }) - return - } - sap.serve(id, *backFillIplds) - } - if streamFilters.BackFillOnly { - delete(sap.Subscriptions, id) + sap.backFill(subscription, id) + } + if !streamFilters.BackFillOnly { + sap.Lock() + sap.Subscriptions[id] = subscription + sap.Unlock() + } +} + +func (sap *Service) backFill(sub Subscription, id rpc.ID) { + log.Debug("back-filling data for id", id) + // Retrieve cached CIDs relevant to this subscriber + cidWrappers, err := sap.Retriever.RetrieveCIDs(*sub.StreamFilters) + if err != nil { + sub.PayloadChan <- ResponsePayload{ + ErrMsg: "CID retrieval error: " + err.Error(), + } + } + for _, cidWrapper := range cidWrappers { + blocksWrapper, err := sap.Fetcher.FetchCIDs(cidWrapper) + if err != nil { + log.Error(err) + sub.PayloadChan <- ResponsePayload{ + ErrMsg: "IPLD fetching error: " + err.Error(), + } + } + backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper) + if err != nil { + log.Error(err) + sub.PayloadChan <- ResponsePayload{ + ErrMsg: "IPLD resolving error: " + err.Error(), + } + } + select { + case sub.PayloadChan <- *backFillIplds: + log.Infof("sending seed node back-fill payload to subscription %s", id) + default: + log.Infof("unable to send back-fill ppayload to subscription %s; channel has no receiver", id) } } - sap.Unlock() } // Unsubscribe is used to unsubscribe to the StateDiffingService loop diff --git a/vendor/github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_block_receipts/dag_putter.go b/vendor/github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_block_receipts/dag_putter.go index 82d13b27..b8bb16f3 100644 --- a/vendor/github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_block_receipts/dag_putter.go +++ b/vendor/github.com/vulcanize/eth-block-extractor/pkg/ipfs/eth_block_receipts/dag_putter.go @@ -22,7 +22,8 @@ func (dagPutter *EthBlockReceiptDagPutter) DagPut(raw interface{}) ([]string, er input := raw.(types.Receipts) var output []string for _, r := range input { - node, err := getReceiptNode(r) + receiptForStorage := (*types.ReceiptForStorage)(r) + node, err := getReceiptNode(receiptForStorage) if err != nil { return nil, err } @@ -35,7 +36,7 @@ func (dagPutter *EthBlockReceiptDagPutter) DagPut(raw interface{}) ([]string, er return output, nil } -func getReceiptNode(receipt *types.Receipt) (*EthReceiptNode, error) { +func getReceiptNode(receipt *types.ReceiptForStorage) (*EthReceiptNode, error) { buffer := new(bytes.Buffer) err := receipt.EncodeRLP(buffer) if err != nil { @@ -50,4 +51,4 @@ func getReceiptNode(receipt *types.Receipt) (*EthReceiptNode, error) { cid: receiptCid, } return node, nil -} +} \ No newline at end of file