work on wasm watchers

This commit is contained in:
Ian Norden 2020-02-23 17:14:29 -06:00
parent e3e8700d34
commit 25aa4634e9
32 changed files with 683 additions and 339 deletions

View File

@ -67,7 +67,11 @@ func streamEthSubscription() {
payloadChan := make(chan super_node.SubscriptionPayload, 20000)
// Subscribe to the super node service with the given config/filter parameters
sub, err := str.Stream(payloadChan, ethSubConfig)
rlpParams, err := rlp.EncodeToBytes(ethSubConfig)
if err != nil {
logWithCommand.Fatal(err)
}
sub, err := str.Stream(payloadChan, rlpParams)
if err != nil {
logWithCommand.Fatal(err)
}
@ -83,17 +87,16 @@ func streamEthSubscription() {
var ethData eth.IPLDs
if err := rlp.DecodeBytes(payload.Data, &ethData); err != nil {
logWithCommand.Error(err)
continue
}
for _, headerRlp := range ethData.Headers {
var header types.Header
err = rlp.Decode(bytes.NewBuffer(headerRlp.Data), &header)
if err != nil {
logWithCommand.Error(err)
continue
}
fmt.Printf("Header number %d, hash %s\n", header.Number.Int64(), header.Hash().Hex())
fmt.Printf("header: %v\n", header)
var header types.Header
err = rlp.Decode(bytes.NewBuffer(ethData.Header.Data), &header)
if err != nil {
logWithCommand.Error(err)
continue
}
fmt.Printf("Header number %d, hash %s\n", header.Number.Int64(), header.Hash().Hex())
fmt.Printf("header: %v\n", header)
for _, trxRlp := range ethData.Transactions {
var trx types.Transaction
buff := bytes.NewBuffer(trxRlp.Data)
@ -107,7 +110,7 @@ func streamEthSubscription() {
fmt.Printf("trx: %v\n", trx)
}
for _, rctRlp := range ethData.Receipts {
var rct types.ReceiptForStorage
var rct types.Receipt
buff := bytes.NewBuffer(rctRlp.Data)
stream := rlp.NewStream(buff, 0)
err = rct.DecodeRLP(stream)

43
cmd/watch.go Normal file
View File

@ -0,0 +1,43 @@
// Copyright © 2020 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"fmt"
"github.com/spf13/cobra"
)
// watchCmd represents the watch command
var watchCmd = &cobra.Command{
Use: "watch",
Short: "Watch and transform data from a chain source",
Long: `This command allows one to configure a set of wasm functions and SQL trigger functions
that call them to watch and transform data from the specified chain source.
A watcher is composed of four parts:
1) Go execution engine- this command- which fetches raw chain data and adds it to the Postres queued ready data tables
2) TOML config file which specifies what subset of chain data to fetch and from where and contains references to the below
3) Set of WASM binaries which are loaded into Postgres and used by
4) Set of PostgreSQL trigger functions which automatically act on data as it is inserted into the queued ready data tables`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("watch called")
},
}
func init() {
rootCmd.AddCommand(watchCmd)
}

View File

@ -0,0 +1,9 @@
-- +goose Up
CREATE TABLE eth.queue_data (
id SERIAL PRIMARY KEY,
data BYTEA NOT NULL,
height BIGINT UNIQUE NOT NULL
);
-- +goose Down
DROP TABLE eth.queue_data;

View File

@ -0,0 +1,9 @@
-- +goose Up
CREATE TABLE btc.queue_data (
id SERIAL PRIMARY KEY,
data BYTEA NOT NULL,
height BIGINT UNIQUE NOT NULL
);
-- +goose Down
DROP TABLE btc.queue_data;

View File

@ -68,6 +68,37 @@ CREATE SEQUENCE btc.header_cids_id_seq
ALTER SEQUENCE btc.header_cids_id_seq OWNED BY btc.header_cids.id;
--
-- Name: queue_data; Type: TABLE; Schema: btc; Owner: -
--
CREATE TABLE btc.queue_data (
id integer NOT NULL,
data bytea NOT NULL,
height bigint NOT NULL
);
--
-- Name: queue_data_id_seq; Type: SEQUENCE; Schema: btc; Owner: -
--
CREATE SEQUENCE btc.queue_data_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: queue_data_id_seq; Type: SEQUENCE OWNED BY; Schema: btc; Owner: -
--
ALTER SEQUENCE btc.queue_data_id_seq OWNED BY btc.queue_data.id;
--
-- Name: transaction_cids; Type: TABLE; Schema: btc; Owner: -
--
@ -185,7 +216,8 @@ CREATE TABLE eth.header_cids (
parent_hash character varying(66) NOT NULL,
cid text NOT NULL,
td numeric NOT NULL,
node_id integer NOT NULL
node_id integer NOT NULL,
reward numeric NOT NULL
);
@ -209,6 +241,37 @@ CREATE SEQUENCE eth.header_cids_id_seq
ALTER SEQUENCE eth.header_cids_id_seq OWNED BY eth.header_cids.id;
--
-- Name: queue_data; Type: TABLE; Schema: eth; Owner: -
--
CREATE TABLE eth.queue_data (
id integer NOT NULL,
data bytea NOT NULL,
height bigint NOT NULL
);
--
-- Name: queue_data_id_seq; Type: SEQUENCE; Schema: eth; Owner: -
--
CREATE SEQUENCE eth.queue_data_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: queue_data_id_seq; Type: SEQUENCE OWNED BY; Schema: eth; Owner: -
--
ALTER SEQUENCE eth.queue_data_id_seq OWNED BY eth.queue_data.id;
--
-- Name: receipt_cids; Type: TABLE; Schema: eth; Owner: -
--
@ -355,7 +418,8 @@ CREATE TABLE eth.uncle_cids (
header_id integer NOT NULL,
block_hash character varying(66) NOT NULL,
parent_hash character varying(66) NOT NULL,
cid text NOT NULL
cid text NOT NULL,
reward numeric NOT NULL
);
@ -771,6 +835,13 @@ ALTER SEQUENCE public.watched_logs_id_seq OWNED BY public.watched_logs.id;
ALTER TABLE ONLY btc.header_cids ALTER COLUMN id SET DEFAULT nextval('btc.header_cids_id_seq'::regclass);
--
-- Name: queue_data id; Type: DEFAULT; Schema: btc; Owner: -
--
ALTER TABLE ONLY btc.queue_data ALTER COLUMN id SET DEFAULT nextval('btc.queue_data_id_seq'::regclass);
--
-- Name: transaction_cids id; Type: DEFAULT; Schema: btc; Owner: -
--
@ -799,6 +870,13 @@ ALTER TABLE ONLY btc.tx_outputs ALTER COLUMN id SET DEFAULT nextval('btc.tx_outp
ALTER TABLE ONLY eth.header_cids ALTER COLUMN id SET DEFAULT nextval('eth.header_cids_id_seq'::regclass);
--
-- Name: queue_data id; Type: DEFAULT; Schema: eth; Owner: -
--
ALTER TABLE ONLY eth.queue_data ALTER COLUMN id SET DEFAULT nextval('eth.queue_data_id_seq'::regclass);
--
-- Name: receipt_cids id; Type: DEFAULT; Schema: eth; Owner: -
--
@ -927,6 +1005,22 @@ ALTER TABLE ONLY btc.header_cids
ADD CONSTRAINT header_cids_pkey PRIMARY KEY (id);
--
-- Name: queue_data queue_data_height_key; Type: CONSTRAINT; Schema: btc; Owner: -
--
ALTER TABLE ONLY btc.queue_data
ADD CONSTRAINT queue_data_height_key UNIQUE (height);
--
-- Name: queue_data queue_data_pkey; Type: CONSTRAINT; Schema: btc; Owner: -
--
ALTER TABLE ONLY btc.queue_data
ADD CONSTRAINT queue_data_pkey PRIMARY KEY (id);
--
-- Name: transaction_cids transaction_cids_pkey; Type: CONSTRAINT; Schema: btc; Owner: -
--
@ -991,6 +1085,22 @@ ALTER TABLE ONLY eth.header_cids
ADD CONSTRAINT header_cids_pkey PRIMARY KEY (id);
--
-- Name: queue_data queue_data_height_key; Type: CONSTRAINT; Schema: eth; Owner: -
--
ALTER TABLE ONLY eth.queue_data
ADD CONSTRAINT queue_data_height_key UNIQUE (height);
--
-- Name: queue_data queue_data_pkey; Type: CONSTRAINT; Schema: eth; Owner: -
--
ALTER TABLE ONLY eth.queue_data
ADD CONSTRAINT queue_data_pkey PRIMARY KEY (id);
--
-- Name: receipt_cids receipt_cids_pkey; Type: CONSTRAINT; Schema: eth; Owner: -
--

View File

@ -17,7 +17,7 @@ The config for `streamEthSubscribe` has a set of parameters to fill the [EthSubs
```toml
[superNode]
[superNode.ethSubscription]
historicalData = true
historicalData = false
historicalDataOnly = false
startingBlock = 0
endingBlock = 0
@ -27,26 +27,18 @@ The config for `streamEthSubscribe` has a set of parameters to fill the [EthSubs
uncles = false
[superNode.ethSubscription.txFilter]
off = false
src = [
"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe",
]
dst = [
"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe",
]
src = []
dst = []
[superNode.ethSubscription.receiptFilter]
off = false
contracts = []
topics = [
[
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377"
]
]
topic0s = []
topic1s = []
topic2s = []
topic3s = []
[superNode.ethSubscription.stateFilter]
off = false
addresses = [
"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"
]
addresses = []
intermediateNodes = false
[superNode.ethSubscription.storageFilter]
off = true

View File

@ -1,6 +1,6 @@
[superNode]
[superNode.ethSubscription]
historicalData = true
historicalData = false
historicalDataOnly = false
startingBlock = 0
endingBlock = 0
@ -10,26 +10,18 @@
uncles = false
[superNode.ethSubscription.txFilter]
off = false
src = [
"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe",
]
dst = [
"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe",
]
src = []
dst = []
[superNode.ethSubscription.receiptFilter]
off = false
contracts = []
topics = [
[
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377"
]
]
topic0s = []
topic1s = []
topic2s = []
topic3s = []
[superNode.ethSubscription.stateFilter]
off = false
addresses = [
"0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe"
]
addresses = []
intermediateNodes = false
[superNode.ethSubscription.storageFilter]
off = true

View File

@ -22,7 +22,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// SuperNodeStreamer is the underlying struct for the shared.SuperNodeStreamer interface
@ -38,6 +37,6 @@ func NewSuperNodeStreamer(client core.RPCClient) *SuperNodeStreamer {
}
// Stream is the main loop for subscribing to data from a vulcanizedb super node
func (sds *SuperNodeStreamer) Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error) {
return sds.Client.Subscribe("vdb", payloadChan, "stream", params)
func (sds *SuperNodeStreamer) Stream(payloadChan chan super_node.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) {
return sds.Client.Subscribe("vdb", payloadChan, "stream", rlpParams)
}

View File

@ -19,12 +19,15 @@ package super_node
import (
"context"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// APIName is the namespace used for the state diffing service API
@ -46,7 +49,24 @@ func NewPublicSuperNodeAPI(superNodeInterface SuperNode) *PublicSuperNodeAPI {
}
// Stream is the public method to setup a subscription that fires off super node payloads as they are processed
func (api *PublicSuperNodeAPI) Stream(ctx context.Context, params shared.SubscriptionSettings) (*rpc.Subscription, error) {
func (api *PublicSuperNodeAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc.Subscription, error) {
var params shared.SubscriptionSettings
switch api.sn.Chain() {
case shared.Ethereum:
var ethParams eth.SubscriptionSettings
if err := rlp.DecodeBytes(rlpParams, &ethParams); err != nil {
return nil, err
}
params = &ethParams
case shared.Bitcoin:
var btcParams btc.SubscriptionSettings
if err := rlp.DecodeBytes(rlpParams, &btcParams); err != nil {
return nil, err
}
params = &btcParams
default:
panic("SuperNode is not configured for a specific chain type")
}
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {

View File

@ -72,10 +72,10 @@ func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IP
if err != nil {
return err
}
response.Headers = append(response.Headers, ipfs.BlockModel{
response.Header = ipfs.BlockModel{
Data: data,
CID: cid.String(),
})
}
}
return nil
}

View File

@ -60,7 +60,7 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
iplds := IPLDs{}
iplds.BlockNumber = cidWrapper.BlockNumber
var err error
iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers)
iplds.Header, err = f.FetchHeader(cidWrapper.Header)
if err != nil {
return nil, err
}
@ -72,30 +72,21 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
}
// FetchHeaders fetches headers
// It uses the f.fetchBatch method
func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching header iplds")
headerCids := make([]cid.Cid, len(cids))
for i, c := range cids {
dc, err := cid.Decode(c.CID)
if err != nil {
return nil, err
}
headerCids[i] = dc
// It uses the f.fetch method
func (f *IPLDFetcher) FetchHeader(c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
dc, err := cid.Decode(c.CID)
if err != nil {
return ipfs.BlockModel{}, err
}
headers := f.fetchBatch(headerCids)
headerIPLDs := make([]ipfs.BlockModel, len(headers))
for i, header := range headers {
headerIPLDs[i] = ipfs.BlockModel{
Data: header.RawData(),
CID: header.Cid().String(),
}
header, err := f.fetch(dc)
if err != nil {
return ipfs.BlockModel{}, err
}
if len(headerIPLDs) != len(headerCids) {
log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids))
return headerIPLDs, errUnexpectedNumberOfIPLDs
}
return headerIPLDs, nil
return ipfs.BlockModel{
Data: header.RawData(),
CID: header.Cid().String(),
}, nil
}
// FetchTrxs fetches transactions

View File

@ -57,7 +57,7 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
}
// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) {
func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) {
streamFilter, ok := filter.(*SubscriptionSettings)
if !ok {
return nil, true, fmt.Errorf("btc retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
@ -68,38 +68,42 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
return nil, true, err
}
cw := new(CIDWrapper)
cw.BlockNumber = big.NewInt(blockNumber)
// Retrieve cached header CIDs
if !streamFilter.HeaderFilter.Off {
cw.Headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return nil, true, err
headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return nil, true, err
}
// Retrieve cached trx CIDs
if !streamFilter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("transaction cid retrieval error")
return nil, true, err
cws := make([]shared.CIDsForFetching, len(headers))
empty := true
for i, header := range headers {
cw := new(CIDWrapper)
cw.BlockNumber = big.NewInt(blockNumber)
if !streamFilter.HeaderFilter.Off {
cw.Header = header
empty = false
}
// Retrieve cached trx CIDs
if !streamFilter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("transaction cid retrieval error")
return nil, true, err
}
if len(cw.Transactions) > 0 {
empty = false
}
}
cws[i] = cw
}
return cw, empty(cw), tx.Commit()
}
func empty(cidWrapper *CIDWrapper) bool {
if len(cidWrapper.Transactions) > 0 || len(cidWrapper.Headers) > 0 {
return false
}
return true
return cws, empty, tx.Commit()
}
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
@ -111,32 +115,10 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]H
return headers, tx.Select(&headers, pgStr, blockNumber)
}
/*
type TxModel struct {
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
Index int64 `db:"index"`
TxHash string `db:"tx_hash"`
CID string `db:"cid"`
SegWit bool `db:"segwit"`
WitnessHash string `db:"witness_hash"`
}
// TxFilter contains filter settings for txs
type TxFilter struct {
Off bool
Index int64 // allow filtering by index so that we can filter for only coinbase transactions (index 0) if we want to
Segwit bool // allow filtering for segwit trxs
WitnessHashes []string // allow filtering for specific witness hashes
PkScriptClass uint8 // allow filtering for txs that have at least one tx output with the specified pkscript class
MultiSig bool // allow filtering for txs that have at least one tx output that requires more than one signature
Addresses []string // allow filtering for txs that have at least one tx output with at least one of the provided addresses
}
*/
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// also returns the ids for the returned transaction cids
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNumber int64) ([]TxModel, error) {
log.Debug("retrieving transaction cids for block ", blockNumber)
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) {
log.Debug("retrieving transaction cids for header id ", headerID)
args := make([]interface{}, 0, 3)
results := make([]TxModel, 0)
id := 1
@ -147,8 +129,8 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNum
WHERE transaction_cids.header_id = header_cids.id
AND tx_inputs.tx_id = transaction_cids.id
AND tx_outputs.tx_id = transaction_cids.id
AND header_cids.block_number = $%d`, id)
args = append(args, blockNumber)
AND header_cids.id = $%d`, id)
args = append(args, headerID)
id++
if txFilter.Segwit {
pgStr += ` AND transaction_cids.segwit = true`

View File

@ -58,7 +58,7 @@ type CIDPayload struct {
// Passed to IPLDFetcher
type CIDWrapper struct {
BlockNumber *big.Int
Headers []HeaderModel
Header HeaderModel
Transactions []TxModel
}
@ -66,7 +66,7 @@ type CIDWrapper struct {
// Returned by IPLDFetcher and ResponseFilterer
type IPLDs struct {
BlockNumber *big.Int
Headers []ipfs.BlockModel
Header ipfs.BlockModel
Transactions []ipfs.BlockModel
}

View File

@ -86,7 +86,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
return nil, fmt.Errorf("header at block %d is not available", number)
}
// Fetch the header IPLDs for those CIDs
headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCids[0]})
headerIPLD, err := b.Fetcher.FetchHeader(headerCids[0])
if err != nil {
return nil, err
}
@ -94,7 +94,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
// We throw an error in FetchHeaders() if the number of headers does not match the number of CIDs and we already
// confirmed the number of CIDs is greater than 0 so there is no need to bound check the slice before accessing
var header types.Header
if err := rlp.DecodeBytes(headerIPLDs[0].Data, &header); err != nil {
if err := rlp.DecodeBytes(headerIPLD.Data, &header); err != nil {
return nil, err
}
return &header, nil
@ -172,12 +172,12 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
}
// Fetch and decode the header IPLD
headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCID})
headerIPLD, err := b.Fetcher.FetchHeader(headerCID)
if err != nil {
return nil, err
}
var header types.Header
if err := rlp.DecodeBytes(headerIPLDs[0].Data, &header); err != nil {
if err := rlp.DecodeBytes(headerIPLD.Data, &header); err != nil {
return nil, err
}
// Fetch and decode the uncle IPLDs
@ -232,12 +232,12 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
return nil, err
}
// Fetch and decode the header IPLD
headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCID})
headerIPLD, err := b.Fetcher.FetchHeader(headerCID)
if err != nil {
return nil, err
}
var header types.Header
if err := rlp.DecodeBytes(headerIPLDs[0].Data, &header); err != nil {
if err := rlp.DecodeBytes(headerIPLD.Data, &header); err != nil {
return nil, err
}
// Fetch and decode the uncle IPLDs

View File

@ -87,10 +87,10 @@ func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IP
if err != nil {
return err
}
response.Headers = append(response.Headers, ipfs.BlockModel{
response.Header = ipfs.BlockModel{
Data: headerRLP,
CID: cid.String(),
})
}
if headerFilter.Uncles {
response.Uncles = make([]ipfs.BlockModel, len(payload.Block.Body().Uncles))
for i, uncle := range payload.Block.Body().Uncles {
@ -126,6 +126,7 @@ func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLD
trxHashes = make([]common.Hash, 0, trxLen)
response.Transactions = make([]ipfs.BlockModel, 0, trxLen)
for i, trx := range payload.Block.Body().Transactions {
// TODO: check if want corresponding receipt and if we do we must include this transaction
if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) {
trxBuffer := new(bytes.Buffer)
if err := trx.EncodeRLP(trxBuffer); err != nil {

View File

@ -43,7 +43,7 @@ var _ = Describe("Filterer", func() {
iplds, ok := payload.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(iplds.Headers).To(Equal(mocks.MockIPLDs.Headers))
Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header))
var expectedEmptyUncles []ipfs.BlockModel
Expect(iplds.Uncles).To(Equal(expectedEmptyUncles))
Expect(len(iplds.Transactions)).To(Equal(2))
@ -77,7 +77,7 @@ var _ = Describe("Filterer", func() {
iplds1, ok := payload1.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds1.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(iplds1.Headers)).To(Equal(0))
Expect(iplds1.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds1.Uncles)).To(Equal(0))
Expect(len(iplds1.Transactions)).To(Equal(0))
Expect(len(iplds1.StorageNodes)).To(Equal(0))
@ -93,7 +93,7 @@ var _ = Describe("Filterer", func() {
iplds2, ok := payload2.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds2.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(iplds2.Headers)).To(Equal(0))
Expect(iplds2.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds2.Uncles)).To(Equal(0))
Expect(len(iplds2.Transactions)).To(Equal(0))
Expect(len(iplds2.StorageNodes)).To(Equal(0))
@ -109,7 +109,7 @@ var _ = Describe("Filterer", func() {
iplds3, ok := payload3.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds3.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(iplds3.Headers)).To(Equal(0))
Expect(iplds3.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds3.Uncles)).To(Equal(0))
Expect(len(iplds3.Transactions)).To(Equal(0))
Expect(len(iplds3.StorageNodes)).To(Equal(0))
@ -125,7 +125,7 @@ var _ = Describe("Filterer", func() {
iplds4, ok := payload4.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds4.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(iplds4.Headers)).To(Equal(0))
Expect(iplds4.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds4.Uncles)).To(Equal(0))
Expect(len(iplds4.Transactions)).To(Equal(0))
Expect(len(iplds4.StorageNodes)).To(Equal(0))
@ -141,7 +141,7 @@ var _ = Describe("Filterer", func() {
iplds5, ok := payload5.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds5.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(iplds5.Headers)).To(Equal(0))
Expect(iplds5.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds5.Uncles)).To(Equal(0))
Expect(len(iplds5.Transactions)).To(Equal(2))
Expect(shared.IPLDsContainBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
@ -157,7 +157,7 @@ var _ = Describe("Filterer", func() {
iplds6, ok := payload6.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds6.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(iplds6.Headers)).To(Equal(0))
Expect(iplds6.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds6.Uncles)).To(Equal(0))
Expect(len(iplds6.Transactions)).To(Equal(1))
Expect(shared.IPLDsContainBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
@ -174,7 +174,7 @@ var _ = Describe("Filterer", func() {
iplds7, ok := payload7.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds7.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(iplds7.Headers)).To(Equal(0))
Expect(iplds7.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds7.Uncles)).To(Equal(0))
Expect(len(iplds7.Transactions)).To(Equal(0))
Expect(len(iplds7.StorageNodes)).To(Equal(0))
@ -191,7 +191,7 @@ var _ = Describe("Filterer", func() {
iplds8, ok := payload8.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds8.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(iplds8.Headers)).To(Equal(0))
Expect(iplds8.Header).To(Equal(ipfs.BlockModel{}))
Expect(len(iplds8.Uncles)).To(Equal(0))
Expect(len(iplds8.Transactions)).To(Equal(0))
Expect(len(iplds8.StorageNodes)).To(Equal(0))

View File

@ -55,6 +55,7 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("eth indexer error when indexing header")
return err
}
for _, uncle := range cidPayload.UncleCIDs {
@ -62,6 +63,7 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("eth indexer error when indexing uncle")
return err
}
}
@ -69,12 +71,14 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("eth indexer error when indexing transactions and receipts")
return err
}
if err := in.indexStateAndStorageCIDs(tx, cidPayload, headerID); err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("eth indexer error when indexing state and storage nodes")
return err
}
return tx.Commit()
@ -92,7 +96,7 @@ func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel, nodeID int
func (in *CIDIndexer) indexUncleCID(tx *sqlx.Tx, uncle UncleModel, headerID int64) error {
_, err := tx.Exec(`INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward) = ($3, $4, $5)`,
uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID)
uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward)
return err
}

View File

@ -61,7 +61,7 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
iplds := IPLDs{}
iplds.BlockNumber = cidWrapper.BlockNumber
var err error
iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers)
iplds.Header, err = f.FetchHeader(cidWrapper.Header)
if err != nil {
return nil, err
}
@ -89,30 +89,21 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
}
// FetchHeaders fetches headers
// It uses the f.fetchBatch method
func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching header iplds")
headerCids := make([]cid.Cid, len(cids))
for i, c := range cids {
dc, err := cid.Decode(c.CID)
if err != nil {
return nil, err
}
headerCids[i] = dc
// It uses the f.fetch method
func (f *IPLDFetcher) FetchHeader(c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
dc, err := cid.Decode(c.CID)
if err != nil {
return ipfs.BlockModel{}, err
}
headers := f.fetchBatch(headerCids)
headerIPLDs := make([]ipfs.BlockModel, len(headers))
for i, header := range headers {
headerIPLDs[i] = ipfs.BlockModel{
Data: header.RawData(),
CID: header.Cid().String(),
}
header, err := f.fetch(dc)
if err != nil {
return ipfs.BlockModel{}, err
}
if len(headerIPLDs) != len(headerCids) {
log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids))
return headerIPLDs, errUnexpectedNumberOfIPLDs
}
return headerIPLDs, nil
return ipfs.BlockModel{
Data: header.RawData(),
CID: header.Cid().String(),
}, nil
}
// FetchUncles fetches uncles

View File

@ -50,10 +50,8 @@ var (
mockBlockService *mocks.MockIPFSBlockService
mockCIDWrapper = &eth.CIDWrapper{
BlockNumber: big.NewInt(9000),
Headers: []eth.HeaderModel{
{
CID: mockHeaderBlock.Cid().String(),
},
Header: eth.HeaderModel{
CID: mockHeaderBlock.Cid().String(),
},
Uncles: []eth.UncleModel{
{
@ -107,8 +105,7 @@ var _ = Describe("Fetcher", func() {
iplds, ok := i.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds.BlockNumber).To(Equal(mockCIDWrapper.BlockNumber))
Expect(len(iplds.Headers)).To(Equal(1))
Expect(iplds.Headers[0]).To(Equal(ipfs.BlockModel{
Expect(iplds.Header).To(Equal(ipfs.BlockModel{
Data: mockHeaderBlock.RawData(),
CID: mockHeaderBlock.Cid().String(),
}))

View File

@ -296,15 +296,13 @@ var (
MockCIDWrapper = &eth.CIDWrapper{
BlockNumber: big.NewInt(1),
Headers: []eth2.HeaderModel{
{
BlockNumber: "1",
BlockHash: MockBlock.Hash().String(),
ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000",
CID: HeaderCID.String(),
TotalDifficulty: MockBlock.Difficulty().String(),
Reward: "5000000000000000000",
},
Header: eth2.HeaderModel{
BlockNumber: "1",
BlockHash: MockBlock.Hash().String(),
ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000",
CID: HeaderCID.String(),
TotalDifficulty: MockBlock.Difficulty().String(),
Reward: "5000000000000000000",
},
Transactions: MockTrxMetaPostPublsh,
Receipts: MockRctMetaPostPublish,
@ -331,11 +329,9 @@ var (
MockIPLDs = eth.IPLDs{
BlockNumber: big.NewInt(1),
Headers: []ipfs.BlockModel{
{
Data: HeaderIPLD.RawData(),
CID: HeaderIPLD.Cid().String(),
},
Header: ipfs.BlockModel{
Data: HeaderIPLD.RawData(),
CID: HeaderIPLD.Cid().String(),
},
Transactions: []ipfs.BlockModel{
{

View File

@ -57,7 +57,7 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
}
// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) {
func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) {
streamFilter, ok := filter.(*SubscriptionSettings)
if !ok {
return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
@ -68,21 +68,26 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
return nil, true, err
}
cw := new(CIDWrapper)
cw.BlockNumber = big.NewInt(blockNumber)
// Retrieve cached header CIDs
if !streamFilter.HeaderFilter.Off {
cw.Headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return nil, true, err
// Retrieve cached header CIDs at this block height
headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
if streamFilter.HeaderFilter.Uncles {
for _, headerCID := range cw.Headers {
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID)
log.Error("header cid retrieval error")
return nil, true, err
}
cws := make([]shared.CIDsForFetching, len(headers))
empty := true
for i, header := range headers {
cw := new(CIDWrapper)
cw.BlockNumber = big.NewInt(blockNumber)
if !streamFilter.HeaderFilter.Off {
cw.Header = header
empty = false
if streamFilter.HeaderFilter.Uncles {
// Retrieve uncle cids for this header id
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
@ -90,66 +95,73 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
log.Error("uncle cid retrieval error")
return nil, true, err
}
cw.Uncles = append(cw.Uncles, uncleCIDs...)
cw.Uncles = uncleCIDs
}
}
}
// Retrieve cached trx CIDs
if !streamFilter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
// Retrieve cached trx CIDs
if !streamFilter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("transaction cid retrieval error")
return nil, true, err
}
log.Error("transaction cid retrieval error")
return nil, true, err
}
}
trxIds := make([]int64, 0, len(cw.Transactions))
for _, tx := range cw.Transactions {
trxIds = append(trxIds, tx.ID)
}
// Retrieve cached receipt CIDs
if !streamFilter.ReceiptFilter.Off {
cw.Receipts, err = ecr.RetrieveRctCIDs(tx, streamFilter.ReceiptFilter, blockNumber, nil, trxIds)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
if len(cw.Transactions) > 0 {
empty = false
}
log.Error("receipt cid retrieval error")
return nil, true, err
}
}
// Retrieve cached state CIDs
if !streamFilter.StateFilter.Off {
cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, streamFilter.StateFilter, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
trxIds := make([]int64, len(cw.Transactions))
for j, tx := range cw.Transactions {
trxIds[j] = tx.ID
}
// Retrieve cached receipt CIDs
if !streamFilter.ReceiptFilter.Off {
cw.Receipts, err = ecr.RetrieveRctCIDsByHeaderID(tx, streamFilter.ReceiptFilter, header.ID, trxIds)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("receipt cid retrieval error")
return nil, true, err
}
log.Error("state cid retrieval error")
return nil, true, err
}
}
// Retrieve cached storage CIDs
if !streamFilter.StorageFilter.Off {
cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, streamFilter.StorageFilter, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
if len(cw.Receipts) > 0 {
empty = false
}
log.Error("storage cid retrieval error")
return nil, true, err
}
// Retrieve cached state CIDs
if !streamFilter.StateFilter.Off {
cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, streamFilter.StateFilter, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("state cid retrieval error")
return nil, true, err
}
if len(cw.StateNodes) > 0 {
empty = false
}
}
// Retrieve cached storage CIDs
if !streamFilter.StorageFilter.Off {
cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, streamFilter.StorageFilter, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("storage cid retrieval error")
return nil, true, err
}
if len(cw.StorageNodes) > 0 {
empty = false
}
}
cws[i] = cw
}
return cw, empty(cw), tx.Commit()
}
func empty(cidWrapper *CIDWrapper) bool {
if len(cidWrapper.Transactions) > 0 || len(cidWrapper.Headers) > 0 || len(cidWrapper.Uncles) > 0 || len(cidWrapper.Receipts) > 0 || len(cidWrapper.StateNodes) > 0 || len(cidWrapper.StorageNodes) > 0 {
return false
}
return true
return cws, empty, tx.Commit()
}
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
@ -172,8 +184,8 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// also returns the ids for the returned transaction cids
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNumber int64) ([]TxModel, error) {
log.Debug("retrieving transaction cids for block ", blockNumber)
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) {
log.Debug("retrieving transaction cids for header id ", headerID)
args := make([]interface{}, 0, 3)
results := make([]TxModel, 0)
id := 1
@ -181,8 +193,8 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNum
transaction_cids.tx_hash, transaction_cids.cid,
transaction_cids.dst, transaction_cids.src, transaction_cids.index
FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $%d`, id)
args = append(args, blockNumber)
WHERE header_cids.id = $%d`, id)
args = append(args, headerID)
id++
if len(txFilter.Dst) > 0 {
pgStr += fmt.Sprintf(` AND transaction_cids.dst = ANY($%d::VARCHAR(66)[])`, id)
@ -197,18 +209,98 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNum
return results, tx.Select(&results, pgStr, args...)
}
// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight that conform to the provided
// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided
// filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) {
log.Debug("retrieving receipt cids for block ", blockNumber)
id := 1
func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]ReceiptModel, error) {
log.Debug("retrieving receipt cids for header id ", headerID)
args := make([]interface{}, 0, 4)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid,
receipt_cids.contract, receipt_cids.topic0s, receipt_cids.topic1s,
receipt_cids.topic2s, receipt_cids.topic3s
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id
AND header_cids.id = $1`
id := 2
args = append(args, headerID)
if len(rctFilter.Contracts) > 0 {
// Filter on contract addresses if there are any
pgStr += fmt.Sprintf(` AND ((receipt_cids.contract = ANY($%d::VARCHAR(66)[])`, id)
args = append(args, pq.Array(rctFilter.Contracts))
id++
// Filter on topics if there are any
if hasTopics(rctFilter.Topics) {
pgStr += " AND ("
first := true
for i, topicSet := range rctFilter.Topics {
if i < 4 && len(topicSet) > 0 {
if first {
pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
first = false
} else {
pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
}
args = append(args, pq.Array(topicSet))
id++
}
}
pgStr += ")"
}
pgStr += ")"
// Filter on txIDs if there are any and we are matching txs
if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
args = append(args, pq.Array(trxIds))
}
pgStr += ")"
} else { // If there are no contract addresses to filter on
// Filter on topics if there are any
if hasTopics(rctFilter.Topics) {
pgStr += " AND (("
first := true
for i, topicSet := range rctFilter.Topics {
if i < 4 && len(topicSet) > 0 {
if first {
pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
first = false
} else {
pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id)
}
args = append(args, pq.Array(topicSet))
id++
}
}
pgStr += ")"
// Filter on txIDs if there are any and we are matching txs
if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
args = append(args, pq.Array(trxIds))
}
pgStr += ")"
} else if rctFilter.MatchTxs && len(trxIds) > 0 {
// If there are no contract addresses or topics to filter on,
// Filter on txIDs if there are any and we are matching txs
pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
args = append(args, pq.Array(trxIds))
}
}
pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]ReceiptModel, 0)
return receiptCids, tx.Select(&receiptCids, pgStr, args...)
}
// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided
// filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) {
log.Debug("retrieving receipt cids for block ", blockNumber)
args := make([]interface{}, 0, 5)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid,
receipt_cids.contract, receipt_cids.topic0s, receipt_cids.topic1s,
receipt_cids.topic2s, receipt_cids.topic3s
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id`
id := 1
if blockNumber > 0 {
pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id)
args = append(args, blockNumber)
@ -294,15 +386,15 @@ func hasTopics(topics [][]string) bool {
return false
}
// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided blockheight that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, blockNumber int64) ([]StateNodeModel, error) {
log.Debug("retrieving state cids for block ", blockNumber)
// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]StateNodeModel, error) {
log.Debug("retrieving state cids for header id ", headerID)
args := make([]interface{}, 0, 2)
pgStr := `SELECT state_cids.id, state_cids.header_id,
state_cids.state_key, state_cids.leaf, state_cids.cid
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1`
args = append(args, blockNumber)
WHERE header_cids.id = $1`
args = append(args, headerID)
addrLen := len(stateFilter.Addresses)
if addrLen > 0 {
keys := make([]string, addrLen)
@ -319,18 +411,17 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter,
return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...)
}
// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided blockheight that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, blockNumber int64) ([]StorageNodeWithStateKeyModel, error) {
log.Debug("retrieving storage cids for block ", blockNumber)
// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]StorageNodeWithStateKeyModel, error) {
log.Debug("retrieving storage cids for header id ", headerID)
args := make([]interface{}, 0, 3)
id := 1
pgStr := fmt.Sprintf(`SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_key,
pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_key,
storage_cids.leaf, storage_cids.cid, state_cids.state_key FROM eth.storage_cids, eth.state_cids, eth.header_cids
WHERE storage_cids.state_id = state_cids.id
AND state_cids.header_id = header_cids.id
AND header_cids.block_number = $%d`, id)
args = append(args, blockNumber)
id++
AND header_cids.id = $1`
args = append(args, headerID)
id := 2
addrLen := len(storageFilter.Addresses)
if addrLen > 0 {
keys := make([]string, addrLen)

View File

@ -230,14 +230,14 @@ var _ = Describe("Retriever", func() {
cids, empty, err := retriever.Retrieve(openFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
cidWrapper, ok := cids.(*eth.CIDWrapper)
Expect(len(cids)).To(Equal(1))
cidWrapper, ok := cids[0].(*eth.CIDWrapper)
Expect(ok).To(BeTrue())
Expect(cidWrapper.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper.Headers)).To(Equal(1))
expectedHeaderCIDs := mocks.MockCIDWrapper.Headers
expectedHeaderCIDs[0].ID = cidWrapper.Headers[0].ID
expectedHeaderCIDs[0].NodeID = cidWrapper.Headers[0].NodeID
Expect(cidWrapper.Headers).To(Equal(expectedHeaderCIDs))
expectedHeaderCID := mocks.MockCIDWrapper.Header
expectedHeaderCID.ID = cidWrapper.Header.ID
expectedHeaderCID.NodeID = cidWrapper.Header.NodeID
Expect(cidWrapper.Header).To(Equal(expectedHeaderCID))
Expect(len(cidWrapper.Transactions)).To(Equal(2))
Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0].CID)).To(BeTrue())
Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1].CID)).To(BeTrue())
@ -266,10 +266,11 @@ var _ = Describe("Retriever", func() {
cids1, empty, err := retriever.Retrieve(rctContractFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
cidWrapper1, ok := cids1.(*eth.CIDWrapper)
Expect(len(cids1)).To(Equal(1))
cidWrapper1, ok := cids1[0].(*eth.CIDWrapper)
Expect(ok).To(BeTrue())
Expect(cidWrapper1.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper1.Headers)).To(Equal(0))
Expect(cidWrapper1.Header).To(Equal(eth.HeaderModel{}))
Expect(len(cidWrapper1.Transactions)).To(Equal(0))
Expect(len(cidWrapper1.StateNodes)).To(Equal(0))
Expect(len(cidWrapper1.StorageNodes)).To(Equal(0))
@ -282,10 +283,11 @@ var _ = Describe("Retriever", func() {
cids2, empty, err := retriever.Retrieve(rctTopicsFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
cidWrapper2, ok := cids2.(*eth.CIDWrapper)
Expect(len(cids2)).To(Equal(1))
cidWrapper2, ok := cids2[0].(*eth.CIDWrapper)
Expect(ok).To(BeTrue())
Expect(cidWrapper2.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper2.Headers)).To(Equal(0))
Expect(cidWrapper2.Header).To(Equal(eth.HeaderModel{}))
Expect(len(cidWrapper2.Transactions)).To(Equal(0))
Expect(len(cidWrapper2.StateNodes)).To(Equal(0))
Expect(len(cidWrapper2.StorageNodes)).To(Equal(0))
@ -298,10 +300,11 @@ var _ = Describe("Retriever", func() {
cids3, empty, err := retriever.Retrieve(rctTopicsAndContractFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
cidWrapper3, ok := cids3.(*eth.CIDWrapper)
Expect(len(cids3)).To(Equal(1))
cidWrapper3, ok := cids3[0].(*eth.CIDWrapper)
Expect(ok).To(BeTrue())
Expect(cidWrapper3.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper3.Headers)).To(Equal(0))
Expect(cidWrapper3.Header).To(Equal(eth.HeaderModel{}))
Expect(len(cidWrapper3.Transactions)).To(Equal(0))
Expect(len(cidWrapper3.StateNodes)).To(Equal(0))
Expect(len(cidWrapper3.StorageNodes)).To(Equal(0))
@ -314,10 +317,11 @@ var _ = Describe("Retriever", func() {
cids4, empty, err := retriever.Retrieve(rctContractsAndTopicFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
cidWrapper4, ok := cids4.(*eth.CIDWrapper)
Expect(len(cids4)).To(Equal(1))
cidWrapper4, ok := cids4[0].(*eth.CIDWrapper)
Expect(ok).To(BeTrue())
Expect(cidWrapper4.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper4.Headers)).To(Equal(0))
Expect(cidWrapper4.Header).To(Equal(eth.HeaderModel{}))
Expect(len(cidWrapper4.Transactions)).To(Equal(0))
Expect(len(cidWrapper4.StateNodes)).To(Equal(0))
Expect(len(cidWrapper4.StorageNodes)).To(Equal(0))
@ -330,10 +334,11 @@ var _ = Describe("Retriever", func() {
cids5, empty, err := retriever.Retrieve(rctsForAllCollectedTrxs, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
cidWrapper5, ok := cids5.(*eth.CIDWrapper)
Expect(len(cids5)).To(Equal(1))
cidWrapper5, ok := cids5[0].(*eth.CIDWrapper)
Expect(ok).To(BeTrue())
Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper5.Headers)).To(Equal(0))
Expect(cidWrapper5.Header).To(Equal(eth.HeaderModel{}))
Expect(len(cidWrapper5.Transactions)).To(Equal(2))
Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx1CID.String())).To(BeTrue())
Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx2CID.String())).To(BeTrue())
@ -346,10 +351,11 @@ var _ = Describe("Retriever", func() {
cids6, empty, err := retriever.Retrieve(rctsForSelectCollectedTrxs, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
cidWrapper6, ok := cids6.(*eth.CIDWrapper)
Expect(len(cids6)).To(Equal(1))
cidWrapper6, ok := cids6[0].(*eth.CIDWrapper)
Expect(ok).To(BeTrue())
Expect(cidWrapper6.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper6.Headers)).To(Equal(0))
Expect(cidWrapper6.Header).To(Equal(eth.HeaderModel{}))
Expect(len(cidWrapper6.Transactions)).To(Equal(1))
expectedTxCID := mocks.MockCIDWrapper.Transactions[1]
expectedTxCID.ID = cidWrapper6.Transactions[0].ID
@ -366,10 +372,11 @@ var _ = Describe("Retriever", func() {
cids7, empty, err := retriever.Retrieve(stateFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
cidWrapper7, ok := cids7.(*eth.CIDWrapper)
Expect(len(cids7)).To(Equal(1))
cidWrapper7, ok := cids7[0].(*eth.CIDWrapper)
Expect(ok).To(BeTrue())
Expect(cidWrapper7.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber))
Expect(len(cidWrapper7.Headers)).To(Equal(0))
Expect(cidWrapper7.Header).To(Equal(eth.HeaderModel{}))
Expect(len(cidWrapper7.Transactions)).To(Equal(0))
Expect(len(cidWrapper7.Receipts)).To(Equal(0))
Expect(len(cidWrapper7.StorageNodes)).To(Equal(0))

View File

@ -17,7 +17,6 @@
package eth
import (
"errors"
"math/big"
"github.com/spf13/viper"
@ -53,7 +52,8 @@ type TxFilter struct {
// ReceiptFilter contains filter settings for receipts
type ReceiptFilter struct {
Off bool
Off bool
// TODO: change this so that we filter for receipts first and we always return the corresponding transaction
MatchTxs bool // turn on to retrieve receipts that pair with retrieved transactions
Contracts []string
Topics [][]string
@ -70,7 +70,7 @@ type StateFilter struct {
type StorageFilter struct {
Off bool
Addresses []string
StorageKeys []string
StorageKeys []string // need to be the hashs key themselves not slot position
IntermediateNodes bool
}
@ -96,13 +96,12 @@ func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
Src: viper.GetStringSlice("superNode.ethSubscription.txFilter.src"),
Dst: viper.GetStringSlice("superNode.ethSubscription.txFilter.dst"),
}
// Below defaults to false and one slice of length 0
// Which means we get all receipts by default
t := viper.Get("superNode.ethSubscription.receiptFilter.topics")
topics, ok := t.([][]string)
if !ok {
return nil, errors.New("superNode.ethSubscription.receiptFilter.topics needs to be a slice of string slices")
}
// By default all of the topic slices will be empty => match on any/all topics
topics := make([][]string, 4)
topics[0] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic0s")
topics[1] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic1s")
topics[2] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic2s")
topics[3] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic3s")
sc.ReceiptFilter = ReceiptFilter{
Off: viper.GetBool("superNode.ethSubscription.receiptFilter.off"),
MatchTxs: viper.GetBool("superNode.ethSubscription.receiptFilter.matchTxs"),

View File

@ -67,7 +67,7 @@ type CIDPayload struct {
// Passed to IPLDFetcher
type CIDWrapper struct {
BlockNumber *big.Int
Headers []HeaderModel
Header HeaderModel
Uncles []UncleModel
Transactions []TxModel
Receipts []ReceiptModel
@ -79,7 +79,7 @@ type CIDWrapper struct {
// Returned by IPLDFetcher and ResponseFilterer
type IPLDs struct {
BlockNumber *big.Int
Headers []ipfs.BlockModel
Header ipfs.BlockModel
Uncles []ipfs.BlockModel
Transactions []ipfs.BlockModel
Receipts []ipfs.BlockModel

View File

@ -53,6 +53,8 @@ type SuperNode interface {
Unsubscribe(id rpc.ID)
// Method to access the node info for the service
Node() core.Node
// Method to access chain type
Chain() shared.ChainType
}
// Service is the underlying struct for the super node
@ -363,7 +365,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
log.Debug("historical data ending block:", endingBlock)
go func() {
for i := startingBlock; i <= endingBlock; i++ {
cidWrapper, empty, err := sap.Retriever.Retrieve(params, i)
cidWrappers, empty, err := sap.Retriever.Retrieve(params, i)
if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("CID Retrieval error at block %d\r%s", i, err.Error()))
continue
@ -371,21 +373,23 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
if empty {
continue
}
response, err := sap.IPLDFetcher.Fetch(cidWrapper)
if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("IPLD Fetching error at block %d\r%s", i, err.Error()))
continue
}
responseRLP, err := rlp.EncodeToBytes(response)
if err != nil {
log.Error(err)
continue
}
select {
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}:
log.Debugf("sending super node historical data payload to subscription %s", id)
default:
log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id)
for _, cids := range cidWrappers {
response, err := sap.IPLDFetcher.Fetch(cids)
if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("IPLD Fetching error at block %d\r%s", i, err.Error()))
continue
}
responseRLP, err := rlp.EncodeToBytes(response)
if err != nil {
log.Error(err)
continue
}
select {
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}:
log.Debugf("sending super node historical data payload to subscription %s", id)
default:
log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id)
}
}
}
// when we are done backfilling send an empty payload signifying so in the msg
@ -443,6 +447,11 @@ func (sap *Service) Node() core.Node {
return sap.NodeInfo
}
// Chain returns the chain type for this service
func (sap *Service) Chain() shared.ChainType {
return sap.chain
}
// close is used to close all listening subscriptions
// close needs to be called with subscription access locked
func (sap *Service) close() {

View File

@ -52,7 +52,7 @@ type ResponseFilterer interface {
// CIDRetriever retrieves cids according to a provided filter and returns a CID wrapper
type CIDRetriever interface {
Retrieve(filter SubscriptionSettings, blockNumber int64) (CIDsForFetching, bool, error)
Retrieve(filter SubscriptionSettings, blockNumber int64) ([]CIDsForFetching, bool, error)
RetrieveFirstBlockNumber() (int64, error)
RetrieveLastBlockNumber() (int64, error)
RetrieveGapsInData() ([]Gap, error)

View File

@ -31,7 +31,7 @@ type CIDRetriever struct {
}
// RetrieveCIDs mock method
func (*CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) {
func (*CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) {
panic("implement me")
}

View File

@ -23,7 +23,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/postgres"
shared2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/eth"
"github.com/vulcanize/vulcanizedb/pkg/watcher/eth"
"github.com/vulcanize/vulcanizedb/pkg/watcher/shared"
)

View File

@ -19,15 +19,21 @@ package eth
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/watcher/shared"
)
var (
vacuumThreshold int64 = 5000 // dont know how to decided what this should be set to
)
// Repository is the underlying struct for satisfying the shared.Repository interface for eth
type Repository struct {
db *postgres.DB
triggerFunctions [][2]string
deleteCalls int64
}
// NewRepository returns a new eth.Repository that satisfies the shared.Repository interface
@ -35,6 +41,7 @@ func NewRepository(db *postgres.DB, triggerFunctions [][2]string) shared.Reposit
return &Repository{
db: db,
triggerFunctions: triggerFunctions,
deleteCalls: 0,
}
}
@ -46,12 +53,42 @@ func (r *Repository) LoadTriggers() error {
// QueueData puts super node payload data into the db queue
func (r *Repository) QueueData(payload super_node.SubscriptionPayload) error {
panic("implement me")
pgStr := `INSERT INTO eth.queued_data (data, height) VALUES ($1, $2)
ON CONFLICT (height) DO UPDATE SET (data) VALUES ($1)`
_, err := r.db.Exec(pgStr, payload.Data, payload.Height)
return err
}
// GetQueueData grabs super node payload data from the db queue
func (r *Repository) GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error) {
panic("implement me")
// GetQueueData grabs a chunk super node payload data from the queue table so that it can
// be forwarded to the ready table
// this is used to make sure we enter data into the ready table in sequential order
// even if we receive data out-of-order
// it returns the new index
// delete the data it retrieves so as to clear the queue
func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload, int64, error) {
r.deleteCalls++
pgStr := `DELETE FROM eth.queued_data
WHERE height = $1
RETURNING *`
var res shared.QueuedData
if err := r.db.Get(&res, pgStr, height); err != nil {
return super_node.SubscriptionPayload{}, height, err
}
payload := super_node.SubscriptionPayload{
Data: res.Data,
Height: res.Height,
Flag: super_node.EmptyFlag,
}
height++
// Periodically clean up space in the queue table
if r.deleteCalls >= vacuumThreshold {
_, err := r.db.Exec(`VACUUM ANALYZE eth.queued_data`)
if err != nil {
logrus.Error(err)
}
r.deleteCalls = 0
}
return payload, height, nil
}
// ReadyData puts super node payload data in the tables ready for processing by trigger functions

View File

@ -19,6 +19,9 @@ package watcher
import (
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/sirupsen/logrus"
@ -84,7 +87,11 @@ func (s *Service) Init() error {
// Watch is the top level loop for watching super node
func (s *Service) Watch(wg *sync.WaitGroup) error {
sub, err := s.SuperNodeStreamer.Stream(s.PayloadChan, s.WatcherConfig.SubscriptionConfig)
rlpConfig, err := rlp.EncodeToBytes(s.WatcherConfig.SubscriptionConfig)
if err != nil {
return err
}
sub, err := s.SuperNodeStreamer.Stream(s.PayloadChan, rlpConfig)
if err != nil {
return err
}
@ -108,8 +115,13 @@ func (s *Service) Watch(wg *sync.WaitGroup) error {
// combinedQueuing assumes data is not necessarily going to come in linear order
// this is true when we are backfilling and streaming at the head or when we are
// only streaming at the head since reorgs can occur
// NOTE: maybe we should push everything to the wait queue, otherwise the index could be shifted as we retrieve data from it
func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) {
wg.Add(1)
// this goroutine is responsible for allocating incoming data to the ready or wait queue
// depending on if it is at the current index or not
forwardQuit := make(chan bool)
go func() {
for {
select {
@ -120,7 +132,8 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio
continue
}
if payload.Height == atomic.LoadInt64(s.payloadIndex) {
// If the data is at our current index it is ready to be processed; add it to the ready data queue
// If the data is at our current index it is ready to be processed
// add it to the ready data queue and increment the index
if err := s.Repository.ReadyData(payload); err != nil {
logrus.Error(err)
}
@ -134,15 +147,41 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio
logrus.Error(err)
case <-s.QuitChan:
logrus.Info("Watcher shutting down")
forwardQuit <- true
wg.Done()
return
}
}
}()
ticker := time.NewTicker(5 * time.Second)
// this goroutine is responsible for moving data from the wait queue to the ready queue
// preserving the correct order and alignment with the current index
go func() {
for {
select {
case <-ticker.C:
// retrieve queued data, in order, and forward it to the ready queue
index := atomic.LoadInt64(s.payloadIndex)
queueData, newIndex, err := s.Repository.GetQueueData(index)
if err != nil {
logrus.Error(err)
continue
}
atomic.StoreInt64(s.payloadIndex, newIndex)
if err := s.Repository.ReadyData(queueData); err != nil {
logrus.Error(err)
}
case <-forwardQuit:
return
default:
// do nothing, wait til next tick
}
}
}()
}
// backFillOnlyQueuing assumes the data is coming in contiguously and behind the head
// it puts all data on the ready queue
// backFillOnlyQueuing assumes the data is coming in contiguously from behind the head
// it puts all data directly into the ready queue
// it continues until the watcher is told to quit or we receive notification that the backfill is finished
func (s *Service) backFillOnlyQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) {
wg.Add(1)

View File

@ -20,18 +20,17 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// Repository is the interface for the Postgres database
type Repository interface {
LoadTriggers() error
QueueData(payload super_node.SubscriptionPayload) error
GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error)
GetQueueData(height int64) (super_node.SubscriptionPayload, int64, error)
ReadyData(payload super_node.SubscriptionPayload) error
}
// SuperNodeStreamer is the interface for streaming data from a vulcanizeDB super node
type SuperNodeStreamer interface {
Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error)
Stream(payloadChan chan super_node.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error)
}

View File

@ -0,0 +1,24 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package shared
// QueuedData is the db model for queued data
type QueuedData struct {
ID int64 `db:"id"`
Data []byte `db:"data"`
Height int64 `db:"height"`
}