changes to super node to improve compatibility with watcher

This commit is contained in:
Ian Norden 2020-02-20 16:12:52 -06:00
parent f0c5ff8077
commit fb360d8562
56 changed files with 546 additions and 821 deletions

View File

@ -48,7 +48,7 @@ func NewBtcHeader(header *wire.BlockHeader) (*BtcHeader, error) {
return nil, err return nil, err
} }
rawdata := w.Bytes() rawdata := w.Bytes()
c, err := rawdataToCid(MBitcoinHeader, rawdata, mh.DBL_SHA2_256) c, err := RawdataToCid(MBitcoinHeader, rawdata, mh.DBL_SHA2_256)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -33,7 +33,7 @@ func NewBtcTx(tx *wire.MsgTx) (*BtcTx, error) {
return nil, err return nil, err
} }
rawdata := w.Bytes() rawdata := w.Bytes()
c, err := rawdataToCid(MBitcoinTx, rawdata, mh.DBL_SHA2_256) c, err := RawdataToCid(MBitcoinTx, rawdata, mh.DBL_SHA2_256)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -49,7 +49,7 @@ func NewEthHeader(header *types.Header) (*EthHeader, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
c, err := rawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256) c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -48,7 +48,7 @@ func NewReceipt(receipt *types.ReceiptForStorage) (*EthReceipt, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
c, err := rawdataToCid(MEthTxReceipt, receiptRLP, mh.KECCAK_256) c, err := RawdataToCid(MEthTxReceipt, receiptRLP, mh.KECCAK_256)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -41,7 +41,7 @@ var _ node.Node = (*EthStateTrie)(nil)
// FromStateTrieRLP takes the RLP bytes of an ethereum // FromStateTrieRLP takes the RLP bytes of an ethereum
// state trie node to return it as an IPLD node for further processing. // state trie node to return it as an IPLD node for further processing.
func FromStateTrieRLP(stateNodeRLP []byte) (*EthStateTrie, error) { func FromStateTrieRLP(stateNodeRLP []byte) (*EthStateTrie, error) {
c, err := rawdataToCid(MEthStateTrie, stateNodeRLP, mh.KECCAK_256) c, err := RawdataToCid(MEthStateTrie, stateNodeRLP, mh.KECCAK_256)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -41,7 +41,7 @@ var _ node.Node = (*EthStorageTrie)(nil)
// FromStorageTrieRLP takes the RLP bytes of an ethereum // FromStorageTrieRLP takes the RLP bytes of an ethereum
// storage trie node to return it as an IPLD node for further processing. // storage trie node to return it as an IPLD node for further processing.
func FromStorageTrieRLP(storageNodeRLP []byte) (*EthStorageTrie, error) { func FromStorageTrieRLP(storageNodeRLP []byte) (*EthStorageTrie, error) {
c, err := rawdataToCid(MEthStorageTrie, storageNodeRLP, mh.KECCAK_256) c, err := RawdataToCid(MEthStorageTrie, storageNodeRLP, mh.KECCAK_256)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -50,7 +50,7 @@ func NewEthTx(tx *types.Transaction) (*EthTx, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
c, err := rawdataToCid(MEthTx, txRLP, mh.KECCAK_256) c, err := RawdataToCid(MEthTx, txRLP, mh.KECCAK_256)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -40,9 +40,9 @@ const (
MBitcoinTx = 0xb1 MBitcoinTx = 0xb1
) )
// rawdataToCid takes the desired codec and a slice of bytes // RawdataToCid takes the desired codec and a slice of bytes
// and returns the proper cid of the object. // and returns the proper cid of the object.
func rawdataToCid(codec uint64, rawdata []byte, multiHash uint64) (cid.Cid, error) { func RawdataToCid(codec uint64, rawdata []byte, multiHash uint64) (cid.Cid, error) {
c, err := cid.Prefix{ c, err := cid.Prefix{
Codec: codec, Codec: codec,
Version: 1, Version: 1,

View File

@ -52,7 +52,7 @@ type BackFillService struct {
// Interface for fetching payloads over at historical blocks; over http // Interface for fetching payloads over at historical blocks; over http
Fetcher shared.PayloadFetcher Fetcher shared.PayloadFetcher
// Channel for forwarding backfill payloads to the ScreenAndServe process // Channel for forwarding backfill payloads to the ScreenAndServe process
ScreenAndServeChan chan shared.StreamedIPLDs ScreenAndServeChan chan shared.ConvertedData
// Check frequency // Check frequency
GapCheckFrequency time.Duration GapCheckFrequency time.Duration
// Size of batch fetches // Size of batch fetches
@ -62,7 +62,7 @@ type BackFillService struct {
} }
// NewBackFillService returns a new BackFillInterface // NewBackFillService returns a new BackFillInterface
func NewBackFillService(settings *shared.SuperNodeConfig, screenAndServeChan chan shared.StreamedIPLDs) (BackFillInterface, error) { func NewBackFillService(settings *shared.SuperNodeConfig, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) {
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath) publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -41,10 +41,10 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil, ReturnErr: nil,
} }
mockConverter := &mocks.IterativePayloadConverter{ mockConverter := &mocks.IterativePayloadConverter{
ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, ReturnIPLDPayload: []eth.ConvertedPayload{mocks.MockConvertedPayload, mocks.MockConvertedPayload},
ReturnErr: nil, ReturnErr: nil,
} }
mockRetriever := &mocks2.MockCIDRetriever{ mockRetriever := &mocks2.CIDRetriever{
FirstBlockNumberToReturn: 0, FirstBlockNumberToReturn: 0,
GapsToRetrieve: []shared.Gap{ GapsToRetrieve: []shared.Gap{
{ {
@ -52,7 +52,7 @@ var _ = Describe("BackFiller", func() {
}, },
}, },
} }
mockFetcher := &mocks2.IPLDFetcher{ mockFetcher := &mocks2.PayloadFetcher{
PayloadsToReturn: map[uint64]shared.RawChainData{ PayloadsToReturn: map[uint64]shared.RawChainData{
100: mocks.MockStateDiffPayload, 100: mocks.MockStateDiffPayload,
101: mocks.MockStateDiffPayload, 101: mocks.MockStateDiffPayload,
@ -77,8 +77,8 @@ var _ = Describe("BackFiller", func() {
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload)) Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload))
Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2)) Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2))
Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload)) Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload))
Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload)) Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockConvertedPayload))
Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2)) Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2))
Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload))
Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload))
@ -96,10 +96,10 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil, ReturnErr: nil,
} }
mockConverter := &mocks.IterativePayloadConverter{ mockConverter := &mocks.IterativePayloadConverter{
ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload}, ReturnIPLDPayload: []eth.ConvertedPayload{mocks.MockConvertedPayload},
ReturnErr: nil, ReturnErr: nil,
} }
mockRetriever := &mocks2.MockCIDRetriever{ mockRetriever := &mocks2.CIDRetriever{
FirstBlockNumberToReturn: 0, FirstBlockNumberToReturn: 0,
GapsToRetrieve: []shared.Gap{ GapsToRetrieve: []shared.Gap{
{ {
@ -107,7 +107,7 @@ var _ = Describe("BackFiller", func() {
}, },
}, },
} }
mockFetcher := &mocks2.IPLDFetcher{ mockFetcher := &mocks2.PayloadFetcher{
PayloadsToReturn: map[uint64]shared.RawChainData{ PayloadsToReturn: map[uint64]shared.RawChainData{
100: mocks.MockStateDiffPayload, 100: mocks.MockStateDiffPayload,
}, },
@ -130,7 +130,7 @@ var _ = Describe("BackFiller", func() {
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(1)) Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(1))
Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload)) Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload))
Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(1)) Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(1))
Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload))
Expect(mockRetriever.CalledTimes).To(Equal(1)) Expect(mockRetriever.CalledTimes).To(Equal(1))
@ -147,14 +147,14 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil, ReturnErr: nil,
} }
mockConverter := &mocks.IterativePayloadConverter{ mockConverter := &mocks.IterativePayloadConverter{
ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, ReturnIPLDPayload: []eth.ConvertedPayload{mocks.MockConvertedPayload, mocks.MockConvertedPayload},
ReturnErr: nil, ReturnErr: nil,
} }
mockRetriever := &mocks2.MockCIDRetriever{ mockRetriever := &mocks2.CIDRetriever{
FirstBlockNumberToReturn: 3, FirstBlockNumberToReturn: 3,
GapsToRetrieve: []shared.Gap{}, GapsToRetrieve: []shared.Gap{},
} }
mockFetcher := &mocks2.IPLDFetcher{ mockFetcher := &mocks2.PayloadFetcher{
PayloadsToReturn: map[uint64]shared.RawChainData{ PayloadsToReturn: map[uint64]shared.RawChainData{
1: mocks.MockStateDiffPayload, 1: mocks.MockStateDiffPayload,
2: mocks.MockStateDiffPayload, 2: mocks.MockStateDiffPayload,
@ -179,8 +179,8 @@ var _ = Describe("BackFiller", func() {
Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload)) Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload))
Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2)) Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2))
Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockIPLDPayload)) Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload))
Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockIPLDPayload)) Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockConvertedPayload))
Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2)) Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(2))
Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload[0]).To(Equal(mocks.MockStateDiffPayload))
Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload))

View File

@ -40,7 +40,7 @@ func NewPayloadConverter(chainConfig *chaincfg.Params) *PayloadConverter {
// Convert method is used to convert a bitcoin BlockPayload to an IPLDPayload // Convert method is used to convert a bitcoin BlockPayload to an IPLDPayload
// Satisfies the shared.PayloadConverter interface // Satisfies the shared.PayloadConverter interface
func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) {
btcBlockPayload, ok := payload.(BlockPayload) btcBlockPayload, ok := payload.(BlockPayload)
if !ok { if !ok {
return nil, fmt.Errorf("btc converter: expected payload type %T got %T", BlockPayload{}, payload) return nil, fmt.Errorf("btc converter: expected payload type %T got %T", BlockPayload{}, payload)
@ -87,7 +87,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame
} }
txMeta[i] = txModel txMeta[i] = txModel
} }
return IPLDPayload{ return ConvertedPayload{
BlockPayload: btcBlockPayload, BlockPayload: btcBlockPayload,
TxMetaData: txMeta, TxMetaData: txMeta,
}, nil }, nil

View File

@ -31,9 +31,9 @@ var _ = Describe("Converter", func() {
converter := btc.NewPayloadConverter(&chaincfg.MainNetParams) converter := btc.NewPayloadConverter(&chaincfg.MainNetParams)
payload, err := converter.Convert(mocks.MockBlockPayload) payload, err := converter.Convert(mocks.MockBlockPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
convertedPayload, ok := payload.(btc.IPLDPayload) convertedPayload, ok := payload.(btc.ConvertedPayload)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(convertedPayload).To(Equal(mocks.MockIPLDPayload)) Expect(convertedPayload).To(Equal(mocks.MockConvertedPayload))
Expect(convertedPayload.BlockHeight).To(Equal(mocks.MockBlockHeight)) Expect(convertedPayload.BlockHeight).To(Equal(mocks.MockBlockHeight))
Expect(convertedPayload.Header).To(Equal(&mocks.MockBlock.Header)) Expect(convertedPayload.Header).To(Equal(&mocks.MockBlock.Header))
Expect(convertedPayload.Txs).To(Equal(mocks.MockTransactions)) Expect(convertedPayload.Txs).To(Equal(mocks.MockTransactions))

View File

@ -33,37 +33,37 @@ func NewResponseFilterer() *ResponseFilterer {
} }
// Filter is used to filter through btc data to extract and package requested data into a Payload // Filter is used to filter through btc data to extract and package requested data into a Payload
func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) { func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.ConvertedData) (shared.IPLDs, error) {
btcFilters, ok := filter.(*SubscriptionSettings) btcFilters, ok := filter.(*SubscriptionSettings)
if !ok { if !ok {
return StreamResponse{}, fmt.Errorf("btc filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) return IPLDs{}, fmt.Errorf("btc filterer expected filter type %T got %T", &SubscriptionSettings{}, filter)
} }
btcPayload, ok := payload.(IPLDPayload) btcPayload, ok := payload.(ConvertedPayload)
if !ok { if !ok {
return StreamResponse{}, fmt.Errorf("btc filterer expected payload type %T got %T", IPLDPayload{}, payload) return IPLDs{}, fmt.Errorf("btc filterer expected payload type %T got %T", ConvertedPayload{}, payload)
} }
height := int64(btcPayload.BlockPayload.BlockHeight) height := int64(btcPayload.BlockPayload.BlockHeight)
if checkRange(btcFilters.Start.Int64(), btcFilters.End.Int64(), height) { if checkRange(btcFilters.Start.Int64(), btcFilters.End.Int64(), height) {
response := new(StreamResponse) response := new(IPLDs)
if err := s.filterHeaders(btcFilters.HeaderFilter, response, btcPayload); err != nil { if err := s.filterHeaders(btcFilters.HeaderFilter, response, btcPayload); err != nil {
return StreamResponse{}, err return IPLDs{}, err
} }
if err := s.filterTransactions(btcFilters.TxFilter, response, btcPayload); err != nil { if err := s.filterTransactions(btcFilters.TxFilter, response, btcPayload); err != nil {
return StreamResponse{}, err return IPLDs{}, err
} }
response.BlockNumber = big.NewInt(height) response.BlockNumber = big.NewInt(height)
return *response, nil return *response, nil
} }
return StreamResponse{}, nil return IPLDs{}, nil
} }
func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *StreamResponse, payload IPLDPayload) error { func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error {
if !headerFilter.Off { if !headerFilter.Off {
headerBuffer := new(bytes.Buffer) headerBuffer := new(bytes.Buffer)
if err := payload.Header.Serialize(headerBuffer); err != nil { if err := payload.Header.Serialize(headerBuffer); err != nil {
return err return err
} }
response.SerializedHeaders = append(response.SerializedHeaders, headerBuffer.Bytes()) response.Headers = append(response.Headers, headerBuffer.Bytes())
} }
return nil return nil
} }
@ -75,15 +75,16 @@ func checkRange(start, end, actual int64) bool {
return false return false
} }
func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *StreamResponse, payload IPLDPayload) error { func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) error {
if !trxFilter.Off { if !trxFilter.Off {
response.Transactions = make([][]byte, 0, len(payload.TxMetaData))
for i, txMeta := range payload.TxMetaData { for i, txMeta := range payload.TxMetaData {
if checkTransaction(txMeta, trxFilter) { if checkTransaction(txMeta, trxFilter) {
trxBuffer := new(bytes.Buffer) trxBuffer := new(bytes.Buffer)
if err := payload.Txs[i].MsgTx().Serialize(trxBuffer); err != nil { if err := payload.Txs[i].MsgTx().Serialize(trxBuffer); err != nil {
return err return err
} }
response.SerializedTxs = append(response.SerializedTxs, trxBuffer.Bytes()) response.Transactions = append(response.Transactions, trxBuffer.Bytes())
} }
} }
} }

View File

@ -1,17 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc

View File

@ -52,13 +52,13 @@ func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) {
} }
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, error) { func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper) cidWrapper, ok := cids.(*CIDWrapper)
if !ok { if !ok {
return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
} }
log.Debug("fetching iplds") log.Debug("fetching iplds")
iplds := new(IPLDWrapper) iplds := IPLDs{}
iplds.BlockNumber = cidWrapper.BlockNumber iplds.BlockNumber = cidWrapper.BlockNumber
var err error var err error
iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers) iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers)
@ -74,42 +74,50 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, e
// FetchHeaders fetches headers // FetchHeaders fetches headers
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]blocks.Block, error) { func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([][]byte, error) {
log.Debug("fetching header iplds") log.Debug("fetching header iplds")
headerCids := make([]cid.Cid, 0, len(cids)) headerCids := make([]cid.Cid, len(cids))
for _, c := range cids { for i, c := range cids {
dc, err := cid.Decode(c.CID) dc, err := cid.Decode(c.CID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
headerCids = append(headerCids, dc) headerCids[i] = dc
} }
headers := f.fetchBatch(headerCids) headers := f.fetchBatch(headerCids)
if len(headers) != len(headerCids) { headersBytes := make([][]byte, len(headers))
log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) for i, header := range headers {
return headers, errUnexpectedNumberOfIPLDs headersBytes[i] = header.RawData()
} }
return headers, nil if len(headersBytes) != len(headerCids) {
log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids))
return headersBytes, errUnexpectedNumberOfIPLDs
}
return headersBytes, nil
} }
// FetchTrxs fetches transactions // FetchTrxs fetches transactions
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([]blocks.Block, error) { func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([][]byte, error) {
log.Debug("fetching transaction iplds") log.Debug("fetching transaction iplds")
trxCids := make([]cid.Cid, 0, len(cids)) trxCids := make([]cid.Cid, len(cids))
for _, c := range cids { for i, c := range cids {
dc, err := cid.Decode(c.CID) dc, err := cid.Decode(c.CID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
trxCids = append(trxCids, dc) trxCids[i] = dc
} }
trxs := f.fetchBatch(trxCids) trxs := f.fetchBatch(trxCids)
if len(trxs) != len(trxCids) { trxBytes := make([][]byte, len(trxs))
log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) for i, trx := range trxs {
return trxs, errUnexpectedNumberOfIPLDs trxBytes[i] = trx.RawData()
} }
return trxs, nil if len(trxBytes) != len(trxCids) {
log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids))
return trxBytes, errUnexpectedNumberOfIPLDs
}
return trxBytes, nil
} }
// fetch is used to fetch a single cid // fetch is used to fetch a single cid

View File

@ -1,17 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc

View File

@ -26,12 +26,12 @@ import (
// PayloadConverter is the underlying struct for the Converter interface // PayloadConverter is the underlying struct for the Converter interface
type PayloadConverter struct { type PayloadConverter struct {
PassedStatediffPayload btc.BlockPayload PassedStatediffPayload btc.BlockPayload
ReturnIPLDPayload btc.IPLDPayload ReturnIPLDPayload btc.ConvertedPayload
ReturnErr error ReturnErr error
} }
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload // Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) {
stateDiffPayload, ok := payload.(btc.BlockPayload) stateDiffPayload, ok := payload.(btc.BlockPayload)
if !ok { if !ok {
return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload) return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload)
@ -43,13 +43,13 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame
// IterativePayloadConverter is the underlying struct for the Converter interface // IterativePayloadConverter is the underlying struct for the Converter interface
type IterativePayloadConverter struct { type IterativePayloadConverter struct {
PassedStatediffPayload []btc.BlockPayload PassedStatediffPayload []btc.BlockPayload
ReturnIPLDPayload []btc.IPLDPayload ReturnIPLDPayload []btc.ConvertedPayload
ReturnErr error ReturnErr error
iteration int iteration int
} }
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload // Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) {
stateDiffPayload, ok := payload.(btc.BlockPayload) stateDiffPayload, ok := payload.(btc.BlockPayload)
if !ok { if !ok {
return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload) return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload)

View File

@ -26,16 +26,16 @@ import (
// IPLDPublisher is the underlying struct for the Publisher interface // IPLDPublisher is the underlying struct for the Publisher interface
type IPLDPublisher struct { type IPLDPublisher struct {
PassedIPLDPayload btc.IPLDPayload PassedIPLDPayload btc.ConvertedPayload
ReturnCIDPayload *btc.CIDPayload ReturnCIDPayload *btc.CIDPayload
ReturnErr error ReturnErr error
} }
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(btc.IPLDPayload) ipldPayload, ok := payload.(btc.ConvertedPayload)
if !ok { if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.IPLDPayload{}, payload) return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.ConvertedPayload{}, payload)
} }
pub.PassedIPLDPayload = ipldPayload pub.PassedIPLDPayload = ipldPayload
return pub.ReturnCIDPayload, pub.ReturnErr return pub.ReturnCIDPayload, pub.ReturnErr
@ -43,17 +43,17 @@ func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForI
// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing // IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing
type IterativeIPLDPublisher struct { type IterativeIPLDPublisher struct {
PassedIPLDPayload []btc.IPLDPayload PassedIPLDPayload []btc.ConvertedPayload
ReturnCIDPayload []*btc.CIDPayload ReturnCIDPayload []*btc.CIDPayload
ReturnErr error ReturnErr error
iteration int iteration int
} }
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IterativeIPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { func (pub *IterativeIPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(btc.IPLDPayload) ipldPayload, ok := payload.(btc.ConvertedPayload)
if !ok { if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.IPLDPayload{}, payload) return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.ConvertedPayload{}, payload)
} }
pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload) pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload)
if len(pub.ReturnCIDPayload) < pub.iteration+1 { if len(pub.ReturnCIDPayload) < pub.iteration+1 {

View File

@ -677,7 +677,7 @@ var (
Timestamp: MockBlock.Header.Timestamp.UnixNano(), Timestamp: MockBlock.Header.Timestamp.UnixNano(),
Bits: MockBlock.Header.Bits, Bits: MockBlock.Header.Bits,
} }
MockIPLDPayload = btc.IPLDPayload{ MockConvertedPayload = btc.ConvertedPayload{
BlockPayload: MockBlockPayload, BlockPayload: MockBlockPayload,
TxMetaData: MockTxsMetaData, TxMetaData: MockTxsMetaData,
} }

View File

@ -1,17 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc

View File

@ -48,10 +48,10 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) {
} }
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(IPLDPayload) ipldPayload, ok := payload.(ConvertedPayload)
if !ok { if !ok {
return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &IPLDPayload{}, payload) return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &ConvertedPayload{}, payload)
} }
// Process and publish headers // Process and publish headers
headerCid, err := pub.publishHeader(ipldPayload.Header) headerCid, err := pub.publishHeader(ipldPayload.Header)

View File

@ -44,7 +44,7 @@ var _ = Describe("Publisher", func() {
HeaderPutter: mockHeaderDagPutter, HeaderPutter: mockHeaderDagPutter,
TransactionPutter: mockTrxDagPutter, TransactionPutter: mockTrxDagPutter,
} }
payload, err := publisher.Publish(mocks.MockIPLDPayload) payload, err := publisher.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
cidPayload, ok := payload.(*btc.CIDPayload) cidPayload, ok := payload.(*btc.CIDPayload)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())

View File

@ -1,54 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ipfs/go-block-format"
)
// IPLDResolver satisfies the IPLDResolver interface for bitcoin
type IPLDResolver struct{}
// NewIPLDResolver returns a pointer to an IPLDResolver which satisfies the IPLDResolver interface
func NewIPLDResolver() *IPLDResolver {
return &IPLDResolver{}
}
// Resolve is the exported method for resolving all of the BTC IPLDs packaged in an IpfsBlockWrapper
func (eir *IPLDResolver) Resolve(iplds shared.FetchedIPLDs) (shared.ServerResponse, error) {
ipfsBlocks, ok := iplds.(*IPLDWrapper)
if !ok {
return StreamResponse{}, fmt.Errorf("eth resolver expected iplds type %T got %T", &IPLDWrapper{}, iplds)
}
return StreamResponse{
BlockNumber: ipfsBlocks.BlockNumber,
SerializedHeaders: eir.resolve(ipfsBlocks.Headers),
SerializedTxs: eir.resolve(ipfsBlocks.Transactions),
}, nil
}
func (eir *IPLDResolver) resolve(iplds []blocks.Block) [][]byte {
rlps := make([][]byte, 0, len(iplds))
for _, ipld := range iplds {
rlps = append(rlps, ipld.RawData())
}
return rlps
}

View File

@ -1,17 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc

View File

@ -1,17 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc

View File

@ -1,17 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc

View File

@ -17,12 +17,10 @@
package btc package btc
import ( import (
"encoding/json"
"math/big" "math/big"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/ipfs/go-block-format"
) )
// BlockPayload packages the block and tx data received from block connection notifications // BlockPayload packages the block and tx data received from block connection notifications
@ -32,16 +30,16 @@ type BlockPayload struct {
Txs []*btcutil.Tx Txs []*btcutil.Tx
} }
// IPLDPayload is a custom type which packages raw BTC data for publishing to IPFS and filtering to subscribers // ConvertedPayload is a custom type which packages raw BTC data for publishing to IPFS and filtering to subscribers
// Returned by PayloadConverter // Returned by PayloadConverter
// Passed to IPLDPublisher and ResponseFilterer // Passed to IPLDPublisher and ResponseFilterer
type IPLDPayload struct { type ConvertedPayload struct {
BlockPayload BlockPayload
TxMetaData []TxModelWithInsAndOuts TxMetaData []TxModelWithInsAndOuts
} }
// Height satisfies the StreamedIPLDs interface // Height satisfies the StreamedIPLDs interface
func (i IPLDPayload) Height() int64 { func (i ConvertedPayload) Height() int64 {
return i.BlockPayload.BlockHeight return i.BlockPayload.BlockHeight
} }
@ -62,46 +60,15 @@ type CIDWrapper struct {
Transactions []TxModel Transactions []TxModel
} }
// IPLDWrapper is used to package raw IPLD block data fetched from IPFS // IPLDs is used to package raw IPLD block data fetched from IPFS and returned by the server
// Returned by IPLDFetcher // Returned by IPLDFetcher and ResponseFilterer
// Passed to IPLDResolver type IPLDs struct {
type IPLDWrapper struct {
BlockNumber *big.Int BlockNumber *big.Int
Headers []blocks.Block Headers [][]byte
Transactions []blocks.Block Transactions [][]byte
} }
// StreamResponse holds the data streamed from the super node eth service to the requesting clients // Height satisfies the StreamedIPLDs interface
// Returned by IPLDResolver and ResponseFilterer func (i IPLDs) Height() int64 {
// Passed to client subscriptions return i.BlockNumber.Int64()
type StreamResponse struct {
BlockNumber *big.Int `json:"blockNumber"`
SerializedHeaders [][]byte `json:"headerBytes"`
SerializedTxs [][]byte `json:"transactionBytes"`
encoded []byte
err error
}
// Height satisfies the ServerResponse interface
func (sr StreamResponse) Height() int64 {
return sr.BlockNumber.Int64()
}
func (sr *StreamResponse) ensureEncoded() {
if sr.encoded == nil && sr.err == nil {
sr.encoded, sr.err = json.Marshal(sr)
}
}
// Length to implement Encoder interface for StateDiff
func (sr *StreamResponse) Length() int {
sr.ensureEncoded()
return len(sr.encoded)
}
// Encode to implement Encoder interface for StateDiff
func (sr *StreamResponse) Encode() ([]byte, error) {
sr.ensureEncoded()
return sr.encoded, sr.err
} }

View File

@ -148,18 +148,6 @@ func NewIPLDPublisher(chain shared.ChainType, ipfsPath string) (shared.IPLDPubli
} }
} }
// NewIPLDResolver constructs an IPLDResolver for the provided chain type
func NewIPLDResolver(chain shared.ChainType) (shared.IPLDResolver, error) {
switch chain {
case shared.Ethereum:
return eth.NewIPLDResolver(), nil
case shared.Bitcoin:
return btc.NewIPLDResolver(), nil
default:
return nil, fmt.Errorf("invalid chain %s for resolver constructor", chain.String())
}
}
// NewPublicAPI constructs a PublicAPI for the provided chain type // NewPublicAPI constructs a PublicAPI for the provided chain type
func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error) { func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error) {
switch chain { switch chain {

View File

@ -26,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ipfs/go-block-format"
) )
// APIName is the namespace for the super node's eth api // APIName is the namespace for the super node's eth api
@ -48,7 +47,7 @@ func NewPublicEthAPI(b *Backend) *PublicEthAPI {
// BlockNumber returns the block number of the chain head. // BlockNumber returns the block number of the chain head.
func (pea *PublicEthAPI) BlockNumber() hexutil.Uint64 { func (pea *PublicEthAPI) BlockNumber() hexutil.Uint64 {
number, _ := pea.b.retriever.RetrieveLastBlockNumber() number, _ := pea.b.Retriever.RetrieveLastBlockNumber()
return hexutil.Uint64(number) return hexutil.Uint64(number)
} }
@ -74,20 +73,20 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
Contracts: addrStrs, Contracts: addrStrs,
Topics: topicStrSets, Topics: topicStrSets,
} }
tx, err := pea.b.db.Beginx() tx, err := pea.b.DB.Beginx()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// If we have a blockhash to filter on, fire off single retrieval query // If we have a blockhash to filter on, fire off single retrieval query
if crit.BlockHash != nil { if crit.BlockHash != nil {
rctCIDs, err := pea.b.retriever.RetrieveRctCIDs(tx, filter, 0, crit.BlockHash, nil) rctCIDs, err := pea.b.Retriever.RetrieveRctCIDs(tx, filter, 0, crit.BlockHash, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return nil, err return nil, err
} }
rctIPLDs, err := pea.b.fetcher.FetchRcts(rctCIDs) rctIPLDs, err := pea.b.Fetcher.FetchRcts(rctCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -98,14 +97,14 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
startingBlock := crit.FromBlock startingBlock := crit.FromBlock
endingBlock := crit.ToBlock endingBlock := crit.ToBlock
if startingBlock == nil { if startingBlock == nil {
startingBlockInt, err := pea.b.retriever.RetrieveFirstBlockNumber() startingBlockInt, err := pea.b.Retriever.RetrieveFirstBlockNumber()
if err != nil { if err != nil {
return nil, err return nil, err
} }
startingBlock = big.NewInt(startingBlockInt) startingBlock = big.NewInt(startingBlockInt)
} }
if endingBlock == nil { if endingBlock == nil {
endingBlockInt, err := pea.b.retriever.RetrieveLastBlockNumber() endingBlockInt, err := pea.b.Retriever.RetrieveLastBlockNumber()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -115,7 +114,7 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
end := endingBlock.Int64() end := endingBlock.Int64()
allRctCIDs := make([]ReceiptModel, 0) allRctCIDs := make([]ReceiptModel, 0)
for i := start; i <= end; i++ { for i := start; i <= end; i++ {
rctCIDs, err := pea.b.retriever.RetrieveRctCIDs(tx, filter, i, nil, nil) rctCIDs, err := pea.b.Retriever.RetrieveRctCIDs(tx, filter, i, nil, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -124,7 +123,7 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return nil, err return nil, err
} }
rctIPLDs, err := pea.b.fetcher.FetchRcts(allRctCIDs) rctIPLDs, err := pea.b.Fetcher.FetchRcts(allRctCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -181,10 +180,10 @@ func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.H
} }
// extractLogsOfInterest returns logs from the receipt IPLD // extractLogsOfInterest returns logs from the receipt IPLD
func extractLogsOfInterest(rctIPLDs []blocks.Block, wantedTopics [][]string) ([]*types.Log, error) { func extractLogsOfInterest(rctIPLDs [][]byte, wantedTopics [][]string) ([]*types.Log, error) {
var logs []*types.Log var logs []*types.Log
for _, rctIPLD := range rctIPLDs { for _, rctIPLD := range rctIPLDs {
rctRLP := rctIPLD.RawData() rctRLP := rctIPLD
var rct types.Receipt var rct types.Receipt
if err := rlp.DecodeBytes(rctRLP, &rct); err != nil { if err := rlp.DecodeBytes(rctRLP, &rct); err != nil {
return nil, err return nil, err

View File

@ -36,9 +36,9 @@ var (
) )
type Backend struct { type Backend struct {
retriever *CIDRetriever Retriever *CIDRetriever
fetcher *IPLDFetcher Fetcher *IPLDFetcher
db *postgres.DB DB *postgres.DB
} }
func NewEthBackend(db *postgres.DB, ipfsPath string) (*Backend, error) { func NewEthBackend(db *postgres.DB, ipfsPath string) (*Backend, error) {
@ -48,9 +48,9 @@ func NewEthBackend(db *postgres.DB, ipfsPath string) (*Backend, error) {
return nil, err return nil, err
} }
return &Backend{ return &Backend{
retriever: r, Retriever: r,
fetcher: f, Fetcher: f,
db: db, DB: db,
}, nil }, nil
} }
@ -58,7 +58,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
number := blockNumber.Int64() number := blockNumber.Int64()
var err error var err error
if blockNumber == rpc.LatestBlockNumber { if blockNumber == rpc.LatestBlockNumber {
number, err = b.retriever.RetrieveLastBlockNumber() number, err = b.Retriever.RetrieveLastBlockNumber()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -67,11 +67,11 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
return nil, errPendingBlockNumber return nil, errPendingBlockNumber
} }
// Retrieve the CIDs for headers at this height // Retrieve the CIDs for headers at this height
tx, err := b.db.Beginx() tx, err := b.DB.Beginx()
if err != nil { if err != nil {
return nil, err return nil, err
} }
headerCids, err := b.retriever.RetrieveHeaderCIDs(tx, number) headerCids, err := b.Retriever.RetrieveHeaderCIDs(tx, number)
if err != nil { if err != nil {
if err := tx.Rollback(); err != nil { if err := tx.Rollback(); err != nil {
logrus.Error(err) logrus.Error(err)
@ -86,7 +86,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
return nil, fmt.Errorf("header at block %d is not available", number) return nil, fmt.Errorf("header at block %d is not available", number)
} }
// Fetch the header IPLDs for those CIDs // Fetch the header IPLDs for those CIDs
headerIPLDs, err := b.fetcher.FetchHeaders([]HeaderModel{headerCids[0]}) headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCids[0]})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -94,7 +94,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
// We throw an error in FetchHeaders() if the number of headers does not match the number of CIDs and we already // We throw an error in FetchHeaders() if the number of headers does not match the number of CIDs and we already
// confirmed the number of CIDs is greater than 0 so there is no need to bound check the slice before accessing // confirmed the number of CIDs is greater than 0 so there is no need to bound check the slice before accessing
header := new(types.Header) header := new(types.Header)
if err := rlp.DecodeBytes(headerIPLDs[0].RawData(), header); err != nil { if err := rlp.DecodeBytes(headerIPLDs[0], header); err != nil {
return nil, err return nil, err
} }
return header, nil return header, nil
@ -105,7 +105,7 @@ func (b *Backend) GetTd(blockHash common.Hash) (*big.Int, error) {
pgStr := `SELECT header_cids.td FROM header_cids pgStr := `SELECT header_cids.td FROM header_cids
WHERE header_cids.block_hash = $1` WHERE header_cids.block_hash = $1`
var tdStr string var tdStr string
err := b.db.Select(&tdStr, pgStr, blockHash.String()) err := b.DB.Select(&tdStr, pgStr, blockHash.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -118,11 +118,11 @@ func (b *Backend) GetTd(blockHash common.Hash) (*big.Int, error) {
// GetLogs returns all the logs for the given block hash // GetLogs returns all the logs for the given block hash
func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
tx, err := b.db.Beginx() tx, err := b.DB.Beginx()
if err != nil { if err != nil {
return nil, err return nil, err
} }
receiptCIDs, err := b.retriever.RetrieveRctCIDs(tx, ReceiptFilter{}, 0, &hash, nil) receiptCIDs, err := b.Retriever.RetrieveRctCIDs(tx, ReceiptFilter{}, 0, &hash, nil)
if err != nil { if err != nil {
if err := tx.Rollback(); err != nil { if err := tx.Rollback(); err != nil {
logrus.Error(err) logrus.Error(err)
@ -135,14 +135,14 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log
if len(receiptCIDs) == 0 { if len(receiptCIDs) == 0 {
return nil, nil return nil, nil
} }
receiptIPLDs, err := b.fetcher.FetchRcts(receiptCIDs) receiptIPLDs, err := b.Fetcher.FetchRcts(receiptCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
logs := make([][]*types.Log, len(receiptIPLDs)) logs := make([][]*types.Log, len(receiptIPLDs))
for i, rctIPLD := range receiptIPLDs { for i, rctIPLD := range receiptIPLDs {
var rct types.Receipt var rct types.Receipt
if err := rlp.DecodeBytes(rctIPLD.RawData(), &rct); err != nil { if err := rlp.DecodeBytes(rctIPLD, &rct); err != nil {
return nil, err return nil, err
} }
logs[i] = rct.Logs logs[i] = rct.Logs
@ -157,7 +157,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
number := blockNumber.Int64() number := blockNumber.Int64()
var err error var err error
if blockNumber == rpc.LatestBlockNumber { if blockNumber == rpc.LatestBlockNumber {
number, err = b.retriever.RetrieveLastBlockNumber() number, err = b.Retriever.RetrieveLastBlockNumber()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -166,54 +166,54 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
return nil, errPendingBlockNumber return nil, errPendingBlockNumber
} }
// Retrieve all the CIDs for the block // Retrieve all the CIDs for the block
headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.retriever.RetrieveBlockByNumber(number) headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.Retriever.RetrieveBlockByNumber(number)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Fetch and decode the header IPLD // Fetch and decode the header IPLD
headerIPLDs, err := b.fetcher.FetchHeaders([]HeaderModel{headerCID}) headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCID})
if err != nil { if err != nil {
return nil, err return nil, err
} }
var header *types.Header var header *types.Header
if err := rlp.DecodeBytes(headerIPLDs[0].RawData(), header); err != nil { if err := rlp.DecodeBytes(headerIPLDs[0], header); err != nil {
return nil, err return nil, err
} }
// Fetch and decode the uncle IPLDs // Fetch and decode the uncle IPLDs
uncleIPLDs, err := b.fetcher.FetchUncles(uncleCIDs) uncleIPLDs, err := b.Fetcher.FetchUncles(uncleCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var uncles []*types.Header var uncles []*types.Header
for _, uncleIPLD := range uncleIPLDs { for _, uncleIPLD := range uncleIPLDs {
var uncle *types.Header var uncle *types.Header
if err := rlp.DecodeBytes(uncleIPLD.RawData(), uncle); err != nil { if err := rlp.DecodeBytes(uncleIPLD, uncle); err != nil {
return nil, err return nil, err
} }
uncles = append(uncles, uncle) uncles = append(uncles, uncle)
} }
// Fetch and decode the transaction IPLDs // Fetch and decode the transaction IPLDs
txIPLDs, err := b.fetcher.FetchTrxs(txCIDs) txIPLDs, err := b.Fetcher.FetchTrxs(txCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var transactions []*types.Transaction var transactions []*types.Transaction
for _, txIPLD := range txIPLDs { for _, txIPLD := range txIPLDs {
var tx *types.Transaction var tx *types.Transaction
if err := rlp.DecodeBytes(txIPLD.RawData(), tx); err != nil { if err := rlp.DecodeBytes(txIPLD, tx); err != nil {
return nil, err return nil, err
} }
transactions = append(transactions, tx) transactions = append(transactions, tx)
} }
// Fetch and decode the receipt IPLDs // Fetch and decode the receipt IPLDs
rctIPLDs, err := b.fetcher.FetchRcts(rctCIDs) rctIPLDs, err := b.Fetcher.FetchRcts(rctCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var receipts []*types.Receipt var receipts []*types.Receipt
for _, rctIPLD := range rctIPLDs { for _, rctIPLD := range rctIPLDs {
var receipt *types.Receipt var receipt *types.Receipt
if err := rlp.DecodeBytes(rctIPLD.RawData(), receipt); err != nil { if err := rlp.DecodeBytes(rctIPLD, receipt); err != nil {
return nil, err return nil, err
} }
receipts = append(receipts, receipt) receipts = append(receipts, receipt)
@ -226,54 +226,54 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
// detail, otherwise only the transaction hash is returned. // detail, otherwise only the transaction hash is returned.
func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
// Retrieve all the CIDs for the block // Retrieve all the CIDs for the block
headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.retriever.RetrieveBlockByHash(hash) headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.Retriever.RetrieveBlockByHash(hash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Fetch and decode the header IPLD // Fetch and decode the header IPLD
headerIPLDs, err := b.fetcher.FetchHeaders([]HeaderModel{headerCID}) headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCID})
if err != nil { if err != nil {
return nil, err return nil, err
} }
var header *types.Header var header *types.Header
if err := rlp.DecodeBytes(headerIPLDs[0].RawData(), header); err != nil { if err := rlp.DecodeBytes(headerIPLDs[0], header); err != nil {
return nil, err return nil, err
} }
// Fetch and decode the uncle IPLDs // Fetch and decode the uncle IPLDs
uncleIPLDs, err := b.fetcher.FetchUncles(uncleCIDs) uncleIPLDs, err := b.Fetcher.FetchUncles(uncleCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var uncles []*types.Header var uncles []*types.Header
for _, uncleIPLD := range uncleIPLDs { for _, uncleIPLD := range uncleIPLDs {
var uncle *types.Header var uncle *types.Header
if err := rlp.DecodeBytes(uncleIPLD.RawData(), uncle); err != nil { if err := rlp.DecodeBytes(uncleIPLD, uncle); err != nil {
return nil, err return nil, err
} }
uncles = append(uncles, uncle) uncles = append(uncles, uncle)
} }
// Fetch and decode the transaction IPLDs // Fetch and decode the transaction IPLDs
txIPLDs, err := b.fetcher.FetchTrxs(txCIDs) txIPLDs, err := b.Fetcher.FetchTrxs(txCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var transactions []*types.Transaction var transactions []*types.Transaction
for _, txIPLD := range txIPLDs { for _, txIPLD := range txIPLDs {
var tx *types.Transaction var tx *types.Transaction
if err := rlp.DecodeBytes(txIPLD.RawData(), tx); err != nil { if err := rlp.DecodeBytes(txIPLD, tx); err != nil {
return nil, err return nil, err
} }
transactions = append(transactions, tx) transactions = append(transactions, tx)
} }
// Fetch and decode the receipt IPLDs // Fetch and decode the receipt IPLDs
rctIPLDs, err := b.fetcher.FetchRcts(rctCIDs) rctIPLDs, err := b.Fetcher.FetchRcts(rctCIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var receipts []*types.Receipt var receipts []*types.Receipt
for _, rctIPLD := range rctIPLDs { for _, rctIPLD := range rctIPLDs {
var receipt *types.Receipt var receipt *types.Receipt
if err := rlp.DecodeBytes(rctIPLD.RawData(), receipt); err != nil { if err := rlp.DecodeBytes(rctIPLD, receipt); err != nil {
return nil, err return nil, err
} }
receipts = append(receipts, receipt) receipts = append(receipts, receipt)
@ -295,15 +295,15 @@ func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*type
BlockHash string `db:"block_hash"` BlockHash string `db:"block_hash"`
BlockNumber int64 `db:"block_number"` BlockNumber int64 `db:"block_number"`
} }
if err := b.db.Get(&txCIDWithHeaderInfo, pgStr, txHash.String()); err != nil { if err := b.DB.Get(&txCIDWithHeaderInfo, pgStr, txHash.String()); err != nil {
return nil, common.Hash{}, 0, 0, err return nil, common.Hash{}, 0, 0, err
} }
txIPLD, err := b.fetcher.FetchTrxs([]TxModel{{CID: txCIDWithHeaderInfo.CID}}) txIPLD, err := b.Fetcher.FetchTrxs([]TxModel{{CID: txCIDWithHeaderInfo.CID}})
if err != nil { if err != nil {
return nil, common.Hash{}, 0, 0, err return nil, common.Hash{}, 0, 0, err
} }
var transaction *types.Transaction var transaction *types.Transaction
if err := rlp.DecodeBytes(txIPLD[0].RawData(), transaction); err != nil { if err := rlp.DecodeBytes(txIPLD[0], transaction); err != nil {
return nil, common.Hash{}, 0, 0, err return nil, common.Hash{}, 0, 0, err
} }
return transaction, common.HexToHash(txCIDWithHeaderInfo.BlockHash), uint64(txCIDWithHeaderInfo.BlockNumber), uint64(txCIDWithHeaderInfo.Index), nil return transaction, common.HexToHash(txCIDWithHeaderInfo.BlockHash), uint64(txCIDWithHeaderInfo.BlockNumber), uint64(txCIDWithHeaderInfo.Index), nil

View File

@ -42,7 +42,7 @@ func NewPayloadConverter(chainConfig *params.ChainConfig) *PayloadConverter {
// Convert method is used to convert a eth statediff.Payload to an IPLDPayload // Convert method is used to convert a eth statediff.Payload to an IPLDPayload
// Satisfies the shared.PayloadConverter interface // Satisfies the shared.PayloadConverter interface
func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) {
stateDiffPayload, ok := payload.(statediff.Payload) stateDiffPayload, ok := payload.(statediff.Payload)
if !ok { if !ok {
return nil, fmt.Errorf("eth converter: expected payload type %T got %T", statediff.Payload{}, payload) return nil, fmt.Errorf("eth converter: expected payload type %T got %T", statediff.Payload{}, payload)
@ -53,7 +53,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame
return nil, err return nil, err
} }
trxLen := len(block.Transactions()) trxLen := len(block.Transactions())
convertedPayload := IPLDPayload{ convertedPayload := ConvertedPayload{
TotalDifficulty: stateDiffPayload.TotalDifficulty, TotalDifficulty: stateDiffPayload.TotalDifficulty,
Block: block, Block: block,
TxMetaData: make([]TxModel, 0, trxLen), TxMetaData: make([]TxModel, 0, trxLen),

View File

@ -32,7 +32,7 @@ var _ = Describe("Converter", func() {
converter := eth.NewPayloadConverter(params.MainnetChainConfig) converter := eth.NewPayloadConverter(params.MainnetChainConfig)
payload, err := converter.Convert(mocks.MockStateDiffPayload) payload, err := converter.Convert(mocks.MockStateDiffPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
convertedPayload, ok := payload.(eth.IPLDPayload) convertedPayload, ok := payload.(eth.ConvertedPayload)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(convertedPayload.Block.Number().String()).To(Equal(mocks.BlockNumber.String())) Expect(convertedPayload.Block.Number().String()).To(Equal(mocks.BlockNumber.String()))
Expect(convertedPayload.Block.Hash().String()).To(Equal(mocks.MockBlock.Hash().String())) Expect(convertedPayload.Block.Hash().String()).To(Equal(mocks.MockBlock.Hash().String()))

View File

@ -37,58 +37,58 @@ func NewResponseFilterer() *ResponseFilterer {
} }
// Filter is used to filter through eth data to extract and package requested data into a Payload // Filter is used to filter through eth data to extract and package requested data into a Payload
func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) { func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.ConvertedData) (shared.IPLDs, error) {
ethFilters, ok := filter.(*SubscriptionSettings) ethFilters, ok := filter.(*SubscriptionSettings)
if !ok { if !ok {
return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) return IPLDs{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter)
} }
ethPayload, ok := payload.(IPLDPayload) ethPayload, ok := payload.(ConvertedPayload)
if !ok { if !ok {
return StreamResponse{}, fmt.Errorf("eth filterer expected payload type %T got %T", IPLDPayload{}, payload) return IPLDs{}, fmt.Errorf("eth filterer expected payload type %T got %T", ConvertedPayload{}, payload)
} }
if checkRange(ethFilters.Start.Int64(), ethFilters.End.Int64(), ethPayload.Block.Number().Int64()) { if checkRange(ethFilters.Start.Int64(), ethFilters.End.Int64(), ethPayload.Block.Number().Int64()) {
response := new(StreamResponse) response := new(IPLDs)
if err := s.filterHeaders(ethFilters.HeaderFilter, response, ethPayload); err != nil { if err := s.filterHeaders(ethFilters.HeaderFilter, response, ethPayload); err != nil {
return StreamResponse{}, err return IPLDs{}, err
} }
txHashes, err := s.filterTransactions(ethFilters.TxFilter, response, ethPayload) txHashes, err := s.filterTransactions(ethFilters.TxFilter, response, ethPayload)
if err != nil { if err != nil {
return StreamResponse{}, err return IPLDs{}, err
} }
var filterTxs []common.Hash var filterTxs []common.Hash
if ethFilters.ReceiptFilter.MatchTxs { if ethFilters.ReceiptFilter.MatchTxs {
filterTxs = txHashes filterTxs = txHashes
} }
if err := s.filerReceipts(ethFilters.ReceiptFilter, response, ethPayload, filterTxs); err != nil { if err := s.filerReceipts(ethFilters.ReceiptFilter, response, ethPayload, filterTxs); err != nil {
return StreamResponse{}, err return IPLDs{}, err
} }
if err := s.filterState(ethFilters.StateFilter, response, ethPayload); err != nil { if err := s.filterState(ethFilters.StateFilter, response, ethPayload); err != nil {
return StreamResponse{}, err return IPLDs{}, err
} }
if err := s.filterStorage(ethFilters.StorageFilter, response, ethPayload); err != nil { if err := s.filterStorage(ethFilters.StorageFilter, response, ethPayload); err != nil {
return StreamResponse{}, err return IPLDs{}, err
} }
response.BlockNumber = ethPayload.Block.Number() response.BlockNumber = ethPayload.Block.Number()
return *response, nil return *response, nil
} }
return StreamResponse{}, nil return IPLDs{}, nil
} }
func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *StreamResponse, payload IPLDPayload) error { func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error {
if !headerFilter.Off { if !headerFilter.Off {
headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) headerRLP, err := rlp.EncodeToBytes(payload.Block.Header())
if err != nil { if err != nil {
return err return err
} }
response.HeadersRlp = append(response.HeadersRlp, headerRLP) response.Headers = append(response.Headers, headerRLP)
if headerFilter.Uncles { if headerFilter.Uncles {
response.UnclesRlp = make([][]byte, 0, len(payload.Block.Body().Uncles)) response.Uncles = make([][]byte, len(payload.Block.Body().Uncles))
for _, uncle := range payload.Block.Body().Uncles { for i, uncle := range payload.Block.Body().Uncles {
uncleRlp, err := rlp.EncodeToBytes(uncle) uncleRlp, err := rlp.EncodeToBytes(uncle)
if err != nil { if err != nil {
return err return err
} }
response.UnclesRlp = append(response.UnclesRlp, uncleRlp) response.Uncles[i] = uncleRlp
} }
} }
} }
@ -102,17 +102,20 @@ func checkRange(start, end, actual int64) bool {
return false return false
} }
func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *StreamResponse, payload IPLDPayload) ([]common.Hash, error) { func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) ([]common.Hash, error) {
trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions)) var trxHashes []common.Hash
if !trxFilter.Off { if !trxFilter.Off {
trxLen := len(payload.Block.Body().Transactions)
trxHashes = make([]common.Hash, 0, trxLen)
response.Transactions = make([][]byte, 0, trxLen)
for i, trx := range payload.Block.Body().Transactions { for i, trx := range payload.Block.Body().Transactions {
if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) { if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) {
trxBuffer := new(bytes.Buffer) trxBuffer := new(bytes.Buffer)
if err := trx.EncodeRLP(trxBuffer); err != nil { if err := trx.EncodeRLP(trxBuffer); err != nil {
return nil, err return nil, err
} }
response.Transactions = append(response.Transactions, trxBuffer.Bytes())
trxHashes = append(trxHashes, trx.Hash()) trxHashes = append(trxHashes, trx.Hash())
response.TransactionsRlp = append(response.TransactionsRlp, trxBuffer.Bytes())
} }
} }
} }
@ -138,8 +141,9 @@ func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst s
return false return false
} }
func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *StreamResponse, payload IPLDPayload, trxHashes []common.Hash) error { func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error {
if !receiptFilter.Off { if !receiptFilter.Off {
response.Receipts = make([][]byte, 0, len(payload.Receipts))
for i, receipt := range payload.Receipts { for i, receipt := range payload.Receipts {
// topics is always length 4 // topics is always length 4
topics := [][]string{payload.ReceiptMetaData[i].Topic0s, payload.ReceiptMetaData[i].Topic1s, payload.ReceiptMetaData[i].Topic2s, payload.ReceiptMetaData[i].Topic3s} topics := [][]string{payload.ReceiptMetaData[i].Topic0s, payload.ReceiptMetaData[i].Topic1s, payload.ReceiptMetaData[i].Topic2s, payload.ReceiptMetaData[i].Topic3s}
@ -149,7 +153,7 @@ func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *
if err := receiptForStorage.EncodeRLP(receiptBuffer); err != nil { if err := receiptForStorage.EncodeRLP(receiptBuffer); err != nil {
return err return err
} }
response.ReceiptsRlp = append(response.ReceiptsRlp, receiptBuffer.Bytes()) response.Receipts = append(response.Receipts, receiptBuffer.Bytes())
} }
} }
} }
@ -217,9 +221,9 @@ func slicesShareString(slice1, slice2 []string) int {
return 0 return 0
} }
func (s *ResponseFilterer) filterState(stateFilter StateFilter, response *StreamResponse, payload IPLDPayload) error { func (s *ResponseFilterer) filterState(stateFilter StateFilter, response *IPLDs, payload ConvertedPayload) error {
if !stateFilter.Off { if !stateFilter.Off {
response.StateNodesRlp = make(map[common.Hash][]byte) response.StateNodes = make([]StateNode, 0, len(payload.StateNodes))
keyFilters := make([]common.Hash, len(stateFilter.Addresses)) keyFilters := make([]common.Hash, len(stateFilter.Addresses))
for i, addr := range stateFilter.Addresses { for i, addr := range stateFilter.Addresses {
keyFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) keyFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes())
@ -227,7 +231,11 @@ func (s *ResponseFilterer) filterState(stateFilter StateFilter, response *Stream
for _, stateNode := range payload.StateNodes { for _, stateNode := range payload.StateNodes {
if checkNodeKeys(keyFilters, stateNode.Key) { if checkNodeKeys(keyFilters, stateNode.Key) {
if stateNode.Leaf || stateFilter.IntermediateNodes { if stateNode.Leaf || stateFilter.IntermediateNodes {
response.StateNodesRlp[stateNode.Key] = stateNode.Value response.StateNodes = append(response.StateNodes, StateNode{
StateTrieKey: stateNode.Key,
IPLD: stateNode.Value,
Leaf: stateNode.Leaf,
})
} }
} }
} }
@ -248,9 +256,9 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
return false return false
} }
func (s *ResponseFilterer) filterStorage(storageFilter StorageFilter, response *StreamResponse, payload IPLDPayload) error { func (s *ResponseFilterer) filterStorage(storageFilter StorageFilter, response *IPLDs, payload ConvertedPayload) error {
if !storageFilter.Off { if !storageFilter.Off {
response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) response.StorageNodes = make([]StorageNode, 0)
stateKeyFilters := make([]common.Hash, len(storageFilter.Addresses)) stateKeyFilters := make([]common.Hash, len(storageFilter.Addresses))
for i, addr := range storageFilter.Addresses { for i, addr := range storageFilter.Addresses {
stateKeyFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()) stateKeyFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes())
@ -261,10 +269,14 @@ func (s *ResponseFilterer) filterStorage(storageFilter StorageFilter, response *
} }
for stateKey, storageNodes := range payload.StorageNodes { for stateKey, storageNodes := range payload.StorageNodes {
if checkNodeKeys(stateKeyFilters, stateKey) { if checkNodeKeys(stateKeyFilters, stateKey) {
response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte)
for _, storageNode := range storageNodes { for _, storageNode := range storageNodes {
if checkNodeKeys(storageKeyFilters, storageNode.Key) { if checkNodeKeys(storageKeyFilters, storageNode.Key) {
response.StorageNodesRlp[stateKey][storageNode.Key] = storageNode.Value response.StorageNodes = append(response.StorageNodes, StorageNode{
StateTrieKey: stateKey,
StorageTrieKey: storageNode.Key,
IPLD: storageNode.Value,
Leaf: storageNode.Leaf,
})
} }
} }
} }

View File

@ -43,133 +43,141 @@ var _ = Describe("Filterer", func() {
}) })
It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() { It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() {
payload, err := filterer.Filter(openFilter, mocks.MockIPLDPayload) payload, err := filterer.Filter(openFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
superNodePayload, ok := payload.(eth.StreamResponse) iplds, ok := payload.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(iplds.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeedNodePayload.HeadersRlp)) Expect(iplds.Headers).To(Equal(mocks.MockIPLDs.Headers))
var unclesRlp [][]byte var unclesRlp [][]byte
Expect(superNodePayload.UnclesRlp).To(Equal(unclesRlp)) Expect(iplds.Uncles).To(Equal(unclesRlp))
Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2)) Expect(len(iplds.Transactions)).To(Equal(2))
Expect(shared.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) Expect(shared.ListContainsBytes(iplds.Transactions, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
Expect(shared.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) Expect(shared.ListContainsBytes(iplds.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2)) Expect(len(iplds.Receipts)).To(Equal(2))
Expect(shared.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) Expect(shared.ListContainsBytes(iplds.Receipts, expectedRctForStorageRLP1)).To(BeTrue())
Expect(shared.ListContainsBytes(superNodePayload.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) Expect(shared.ListContainsBytes(iplds.Receipts, expectedRctForStorageRLP2)).To(BeTrue())
Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2)) Expect(len(iplds.StateNodes)).To(Equal(2))
Expect(superNodePayload.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) for _, stateNode := range iplds.StateNodes {
Expect(superNodePayload.StateNodesRlp[mocks.AnotherContractLeafKey]).To(Equal(mocks.AnotherValueBytes)) Expect(stateNode.Leaf).To(BeTrue())
Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeedNodePayload.StorageNodesRlp)) if stateNode.StateTrieKey == mocks.ContractLeafKey {
Expect(stateNode.IPLD).To(Equal(mocks.ValueBytes))
}
if stateNode.StateTrieKey == mocks.AnotherContractLeafKey {
Expect(stateNode.IPLD).To(Equal(mocks.AnotherValueBytes))
}
}
Expect(iplds.StorageNodes).To(Equal(mocks.MockIPLDs.StorageNodes))
}) })
It("Applies filters from the provided config.Subscription", func() { It("Applies filters from the provided config.Subscription", func() {
payload1, err := filterer.Filter(rctContractFilter, mocks.MockIPLDPayload) payload1, err := filterer.Filter(rctContractFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
superNodePayload1, ok := payload1.(eth.StreamResponse) iplds1, ok := payload1.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(superNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(iplds1.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(superNodePayload1.HeadersRlp)).To(Equal(0)) Expect(len(iplds1.Headers)).To(Equal(0))
Expect(len(superNodePayload1.UnclesRlp)).To(Equal(0)) Expect(len(iplds1.Uncles)).To(Equal(0))
Expect(len(superNodePayload1.TransactionsRlp)).To(Equal(0)) Expect(len(iplds1.Transactions)).To(Equal(0))
Expect(len(superNodePayload1.StorageNodesRlp)).To(Equal(0)) Expect(len(iplds1.StorageNodes)).To(Equal(0))
Expect(len(superNodePayload1.StateNodesRlp)).To(Equal(0)) Expect(len(iplds1.StateNodes)).To(Equal(0))
Expect(len(superNodePayload1.ReceiptsRlp)).To(Equal(1)) Expect(len(iplds1.Receipts)).To(Equal(1))
Expect(superNodePayload1.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) Expect(iplds1.Receipts[0]).To(Equal(expectedRctForStorageRLP2))
payload2, err := filterer.Filter(rctTopicsFilter, mocks.MockIPLDPayload) payload2, err := filterer.Filter(rctTopicsFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
superNodePayload2, ok := payload2.(eth.StreamResponse) iplds2, ok := payload2.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(superNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(iplds2.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(superNodePayload2.HeadersRlp)).To(Equal(0)) Expect(len(iplds2.Headers)).To(Equal(0))
Expect(len(superNodePayload2.UnclesRlp)).To(Equal(0)) Expect(len(iplds2.Uncles)).To(Equal(0))
Expect(len(superNodePayload2.TransactionsRlp)).To(Equal(0)) Expect(len(iplds2.Transactions)).To(Equal(0))
Expect(len(superNodePayload2.StorageNodesRlp)).To(Equal(0)) Expect(len(iplds2.StorageNodes)).To(Equal(0))
Expect(len(superNodePayload2.StateNodesRlp)).To(Equal(0)) Expect(len(iplds2.StateNodes)).To(Equal(0))
Expect(len(superNodePayload2.ReceiptsRlp)).To(Equal(1)) Expect(len(iplds2.Receipts)).To(Equal(1))
Expect(superNodePayload2.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) Expect(iplds2.Receipts[0]).To(Equal(expectedRctForStorageRLP1))
payload3, err := filterer.Filter(rctTopicsAndContractFilter, mocks.MockIPLDPayload) payload3, err := filterer.Filter(rctTopicsAndContractFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
superNodePayload3, ok := payload3.(eth.StreamResponse) iplds3, ok := payload3.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(superNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(iplds3.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(superNodePayload3.HeadersRlp)).To(Equal(0)) Expect(len(iplds3.Headers)).To(Equal(0))
Expect(len(superNodePayload3.UnclesRlp)).To(Equal(0)) Expect(len(iplds3.Uncles)).To(Equal(0))
Expect(len(superNodePayload3.TransactionsRlp)).To(Equal(0)) Expect(len(iplds3.Transactions)).To(Equal(0))
Expect(len(superNodePayload3.StorageNodesRlp)).To(Equal(0)) Expect(len(iplds3.StorageNodes)).To(Equal(0))
Expect(len(superNodePayload3.StateNodesRlp)).To(Equal(0)) Expect(len(iplds3.StateNodes)).To(Equal(0))
Expect(len(superNodePayload3.ReceiptsRlp)).To(Equal(1)) Expect(len(iplds3.Receipts)).To(Equal(1))
Expect(superNodePayload3.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP1)) Expect(iplds3.Receipts[0]).To(Equal(expectedRctForStorageRLP1))
payload4, err := filterer.Filter(rctContractsAndTopicFilter, mocks.MockIPLDPayload) payload4, err := filterer.Filter(rctContractsAndTopicFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
superNodePayload4, ok := payload4.(eth.StreamResponse) iplds4, ok := payload4.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(superNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(iplds4.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(superNodePayload4.HeadersRlp)).To(Equal(0)) Expect(len(iplds4.Headers)).To(Equal(0))
Expect(len(superNodePayload4.UnclesRlp)).To(Equal(0)) Expect(len(iplds4.Uncles)).To(Equal(0))
Expect(len(superNodePayload4.TransactionsRlp)).To(Equal(0)) Expect(len(iplds4.Transactions)).To(Equal(0))
Expect(len(superNodePayload4.StorageNodesRlp)).To(Equal(0)) Expect(len(iplds4.StorageNodes)).To(Equal(0))
Expect(len(superNodePayload4.StateNodesRlp)).To(Equal(0)) Expect(len(iplds4.StateNodes)).To(Equal(0))
Expect(len(superNodePayload4.ReceiptsRlp)).To(Equal(1)) Expect(len(iplds4.Receipts)).To(Equal(1))
Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) Expect(iplds4.Receipts[0]).To(Equal(expectedRctForStorageRLP2))
payload5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockIPLDPayload) payload5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
superNodePayload5, ok := payload5.(eth.StreamResponse) iplds5, ok := payload5.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(superNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(iplds5.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(superNodePayload5.HeadersRlp)).To(Equal(0)) Expect(len(iplds5.Headers)).To(Equal(0))
Expect(len(superNodePayload5.UnclesRlp)).To(Equal(0)) Expect(len(iplds5.Uncles)).To(Equal(0))
Expect(len(superNodePayload5.TransactionsRlp)).To(Equal(2)) Expect(len(iplds5.Transactions)).To(Equal(2))
Expect(shared.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) Expect(shared.ListContainsBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
Expect(shared.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) Expect(shared.ListContainsBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(superNodePayload5.StorageNodesRlp)).To(Equal(0)) Expect(len(iplds5.StorageNodes)).To(Equal(0))
Expect(len(superNodePayload5.StateNodesRlp)).To(Equal(0)) Expect(len(iplds5.StateNodes)).To(Equal(0))
Expect(len(superNodePayload5.ReceiptsRlp)).To(Equal(2)) Expect(len(iplds5.Receipts)).To(Equal(2))
Expect(shared.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP1)).To(BeTrue()) Expect(shared.ListContainsBytes(iplds5.Receipts, expectedRctForStorageRLP1)).To(BeTrue())
Expect(shared.ListContainsBytes(superNodePayload5.ReceiptsRlp, expectedRctForStorageRLP2)).To(BeTrue()) Expect(shared.ListContainsBytes(iplds5.Receipts, expectedRctForStorageRLP2)).To(BeTrue())
payload6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockIPLDPayload) payload6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
superNodePayload6, ok := payload6.(eth.StreamResponse) iplds6, ok := payload6.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(superNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(iplds6.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(superNodePayload6.HeadersRlp)).To(Equal(0)) Expect(len(iplds6.Headers)).To(Equal(0))
Expect(len(superNodePayload6.UnclesRlp)).To(Equal(0)) Expect(len(iplds6.Uncles)).To(Equal(0))
Expect(len(superNodePayload6.TransactionsRlp)).To(Equal(1)) Expect(len(iplds6.Transactions)).To(Equal(1))
Expect(shared.ListContainsBytes(superNodePayload5.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) Expect(shared.ListContainsBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(superNodePayload6.StorageNodesRlp)).To(Equal(0)) Expect(len(iplds6.StorageNodes)).To(Equal(0))
Expect(len(superNodePayload6.StateNodesRlp)).To(Equal(0)) Expect(len(iplds6.StateNodes)).To(Equal(0))
Expect(len(superNodePayload6.ReceiptsRlp)).To(Equal(1)) Expect(len(iplds6.Receipts)).To(Equal(1))
Expect(superNodePayload4.ReceiptsRlp[0]).To(Equal(expectedRctForStorageRLP2)) Expect(iplds4.Receipts[0]).To(Equal(expectedRctForStorageRLP2))
payload7, err := filterer.Filter(stateFilter, mocks.MockIPLDPayload) payload7, err := filterer.Filter(stateFilter, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
superNodePayload7, ok := payload7.(eth.StreamResponse) iplds7, ok := payload7.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(superNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(iplds7.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(superNodePayload7.HeadersRlp)).To(Equal(0)) Expect(len(iplds7.Headers)).To(Equal(0))
Expect(len(superNodePayload7.UnclesRlp)).To(Equal(0)) Expect(len(iplds7.Uncles)).To(Equal(0))
Expect(len(superNodePayload7.TransactionsRlp)).To(Equal(0)) Expect(len(iplds7.Transactions)).To(Equal(0))
Expect(len(superNodePayload7.StorageNodesRlp)).To(Equal(0)) Expect(len(iplds7.StorageNodes)).To(Equal(0))
Expect(len(superNodePayload7.ReceiptsRlp)).To(Equal(0)) Expect(len(iplds7.Receipts)).To(Equal(0))
Expect(len(superNodePayload7.StateNodesRlp)).To(Equal(1)) Expect(len(iplds7.StateNodes)).To(Equal(1))
Expect(superNodePayload7.StateNodesRlp[mocks.ContractLeafKey]).To(Equal(mocks.ValueBytes)) Expect(iplds7.StateNodes[0].StateTrieKey).To(Equal(mocks.ContractLeafKey))
Expect(iplds7.StateNodes[0].IPLD).To(Equal(mocks.ValueBytes))
payload8, err := filterer.Filter(rctTopicsAndContractFilterFail, mocks.MockIPLDPayload) payload8, err := filterer.Filter(rctTopicsAndContractFilterFail, mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
superNodePayload8, ok := payload8.(eth.StreamResponse) iplds8, ok := payload8.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(superNodePayload8.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(iplds8.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64()))
Expect(len(superNodePayload8.HeadersRlp)).To(Equal(0)) Expect(len(iplds8.Headers)).To(Equal(0))
Expect(len(superNodePayload8.UnclesRlp)).To(Equal(0)) Expect(len(iplds8.Uncles)).To(Equal(0))
Expect(len(superNodePayload8.TransactionsRlp)).To(Equal(0)) Expect(len(iplds8.Transactions)).To(Equal(0))
Expect(len(superNodePayload8.StorageNodesRlp)).To(Equal(0)) Expect(len(iplds8.StorageNodes)).To(Equal(0))
Expect(len(superNodePayload8.StateNodesRlp)).To(Equal(0)) Expect(len(iplds8.StateNodes)).To(Equal(0))
Expect(len(superNodePayload8.ReceiptsRlp)).To(Equal(0)) Expect(len(iplds8.Receipts)).To(Equal(0))
}) })
}) })
}) })

View File

@ -21,8 +21,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-block-format" "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
@ -30,6 +28,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
var ( var (
@ -53,13 +52,13 @@ func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) {
} }
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, error) { func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper) cidWrapper, ok := cids.(*CIDWrapper)
if !ok { if !ok {
return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
} }
log.Debug("fetching iplds") log.Debug("fetching iplds")
iplds := new(IPLDWrapper) iplds := IPLDs{}
iplds.BlockNumber = cidWrapper.BlockNumber iplds.BlockNumber = cidWrapper.BlockNumber
var err error var err error
iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers) iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers)
@ -91,91 +90,107 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, e
// FetchHeaders fetches headers // FetchHeaders fetches headers
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]blocks.Block, error) { func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([][]byte, error) {
log.Debug("fetching header iplds") log.Debug("fetching header iplds")
headerCids := make([]cid.Cid, 0, len(cids)) headerCids := make([]cid.Cid, len(cids))
for _, c := range cids { for i, c := range cids {
dc, err := cid.Decode(c.CID) dc, err := cid.Decode(c.CID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
headerCids = append(headerCids, dc) headerCids[i] = dc
} }
headers := f.fetchBatch(headerCids) headers := f.fetchBatch(headerCids)
if len(headers) != len(headerCids) { headersRLP := make([][]byte, len(headers))
log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) for i, header := range headers {
return headers, errUnexpectedNumberOfIPLDs headersRLP[i] = header.RawData()
} }
return headers, nil if len(headersRLP) != len(headerCids) {
log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids))
return headersRLP, errUnexpectedNumberOfIPLDs
}
return headersRLP, nil
} }
// FetchUncles fetches uncles // FetchUncles fetches uncles
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *IPLDFetcher) FetchUncles(cids []UncleModel) ([]blocks.Block, error) { func (f *IPLDFetcher) FetchUncles(cids []UncleModel) ([][]byte, error) {
log.Debug("fetching uncle iplds") log.Debug("fetching uncle iplds")
uncleCids := make([]cid.Cid, 0, len(cids)) uncleCids := make([]cid.Cid, len(cids))
for _, c := range cids { for i, c := range cids {
dc, err := cid.Decode(c.CID) dc, err := cid.Decode(c.CID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
uncleCids = append(uncleCids, dc) uncleCids[i] = dc
} }
uncles := f.fetchBatch(uncleCids) uncles := f.fetchBatch(uncleCids)
if len(uncles) != len(uncleCids) { unclesRLP := make([][]byte, len(uncles))
log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(uncles), len(uncleCids)) for i, uncle := range uncles {
return uncles, errUnexpectedNumberOfIPLDs unclesRLP[i] = uncle.RawData()
} }
return uncles, nil if len(unclesRLP) != len(uncleCids) {
log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(uncles), len(uncleCids))
return unclesRLP, errUnexpectedNumberOfIPLDs
}
return unclesRLP, nil
} }
// FetchTrxs fetches transactions // FetchTrxs fetches transactions
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([]blocks.Block, error) { func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([][]byte, error) {
log.Debug("fetching transaction iplds") log.Debug("fetching transaction iplds")
trxCids := make([]cid.Cid, 0, len(cids)) trxCids := make([]cid.Cid, len(cids))
for _, c := range cids { for i, c := range cids {
dc, err := cid.Decode(c.CID) dc, err := cid.Decode(c.CID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
trxCids = append(trxCids, dc) trxCids[i] = dc
} }
trxs := f.fetchBatch(trxCids) trxs := f.fetchBatch(trxCids)
if len(trxs) != len(trxCids) { trxsRLP := make([][]byte, len(trxs))
log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) for i, trx := range trxs {
return trxs, errUnexpectedNumberOfIPLDs trxsRLP[i] = trx.RawData()
} }
return trxs, nil if len(trxsRLP) != len(trxCids) {
log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids))
return trxsRLP, errUnexpectedNumberOfIPLDs
}
return trxsRLP, nil
} }
// FetchRcts fetches receipts // FetchRcts fetches receipts
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *IPLDFetcher) FetchRcts(cids []ReceiptModel) ([]blocks.Block, error) { func (f *IPLDFetcher) FetchRcts(cids []ReceiptModel) ([][]byte, error) {
log.Debug("fetching receipt iplds") log.Debug("fetching receipt iplds")
rctCids := make([]cid.Cid, 0, len(cids)) rctCids := make([]cid.Cid, len(cids))
for _, c := range cids { for i, c := range cids {
dc, err := cid.Decode(c.CID) dc, err := cid.Decode(c.CID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rctCids = append(rctCids, dc) rctCids[i] = dc
} }
rcts := f.fetchBatch(rctCids) rcts := f.fetchBatch(rctCids)
if len(rcts) != len(rctCids) { rctsRLP := make([][]byte, len(rcts))
log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(rcts), len(rctCids)) for i, rct := range rcts {
return rcts, errUnexpectedNumberOfIPLDs rctsRLP[i] = rct.RawData()
} }
return rcts, nil if len(rctsRLP) != len(rctCids) {
log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(rcts), len(rctCids))
return rctsRLP, errUnexpectedNumberOfIPLDs
}
return rctsRLP, nil
} }
// FetchState fetches state nodes // FetchState fetches state nodes
// It uses the single f.fetch method instead of the batch fetch, because it // It uses the single f.fetch method instead of the batch fetch, because it
// needs to maintain the data's relation to state keys // needs to maintain the data's relation to state keys
func (f *IPLDFetcher) FetchState(cids []StateNodeModel) (map[common.Hash]blocks.Block, error) { func (f *IPLDFetcher) FetchState(cids []StateNodeModel) ([]StateNode, error) {
log.Debug("fetching state iplds") log.Debug("fetching state iplds")
stateNodes := make(map[common.Hash]blocks.Block) stateNodes := make([]StateNode, len(cids))
for _, stateNode := range cids { for i, stateNode := range cids {
if stateNode.CID == "" || stateNode.StateKey == "" { if stateNode.CID == "" || stateNode.StateKey == "" {
continue continue
} }
@ -187,7 +202,11 @@ func (f *IPLDFetcher) FetchState(cids []StateNodeModel) (map[common.Hash]blocks.
if err != nil { if err != nil {
return nil, err return nil, err
} }
stateNodes[common.HexToHash(stateNode.StateKey)] = state stateNodes[i] = StateNode{
IPLD: state.RawData(),
StateTrieKey: common.HexToHash(stateNode.StateKey),
Leaf: stateNode.Leaf,
}
} }
return stateNodes, nil return stateNodes, nil
} }
@ -195,10 +214,10 @@ func (f *IPLDFetcher) FetchState(cids []StateNodeModel) (map[common.Hash]blocks.
// FetchStorage fetches storage nodes // FetchStorage fetches storage nodes
// It uses the single f.fetch method instead of the batch fetch, because it // It uses the single f.fetch method instead of the batch fetch, because it
// needs to maintain the data's relation to state and storage keys // needs to maintain the data's relation to state and storage keys
func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) (map[common.Hash]map[common.Hash]blocks.Block, error) { func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) {
log.Debug("fetching storage iplds") log.Debug("fetching storage iplds")
storageNodes := make(map[common.Hash]map[common.Hash]blocks.Block) storageNodes := make([]StorageNode, len(cids))
for _, storageNode := range cids { for i, storageNode := range cids {
if storageNode.CID == "" || storageNode.StorageKey == "" || storageNode.StateKey == "" { if storageNode.CID == "" || storageNode.StorageKey == "" || storageNode.StateKey == "" {
continue continue
} }
@ -210,10 +229,12 @@ func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) (map[com
if err != nil { if err != nil {
return nil, err return nil, err
} }
if storageNodes[common.HexToHash(storageNode.StateKey)] == nil { storageNodes[i] = StorageNode{
storageNodes[common.HexToHash(storageNode.StateKey)] = make(map[common.Hash]blocks.Block) IPLD: storage.RawData(),
StateTrieKey: common.HexToHash(storageNode.StateKey),
StorageTrieKey: common.HexToHash(storageNode.StorageKey),
Leaf: storageNode.Leaf,
} }
storageNodes[common.HexToHash(storageNode.StateKey)][common.HexToHash(storageNode.StorageKey)] = storage
} }
return storageNodes, nil return storageNodes, nil
} }

View File

@ -17,6 +17,7 @@
package eth_test package eth_test
import ( import (
"bytes"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -101,30 +102,32 @@ var _ = Describe("Fetcher", func() {
fetcher.BlockService = mockBlockService fetcher.BlockService = mockBlockService
i, err := fetcher.Fetch(mockCIDWrapper) i, err := fetcher.Fetch(mockCIDWrapper)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
iplds, ok := i.(*eth.IPLDWrapper) iplds, ok := i.(eth.IPLDs)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(iplds.BlockNumber).To(Equal(mockCIDWrapper.BlockNumber)) Expect(iplds.BlockNumber).To(Equal(mockCIDWrapper.BlockNumber))
Expect(len(iplds.Headers)).To(Equal(1)) Expect(len(iplds.Headers)).To(Equal(1))
Expect(iplds.Headers[0]).To(Equal(mockHeaderBlock)) Expect(iplds.Headers[0]).To(Equal(mockHeaderBlock.RawData()))
Expect(len(iplds.Uncles)).To(Equal(1)) Expect(len(iplds.Uncles)).To(Equal(1))
Expect(iplds.Uncles[0]).To(Equal(mockUncleBlock)) Expect(iplds.Uncles[0]).To(Equal(mockUncleBlock.RawData()))
Expect(len(iplds.Transactions)).To(Equal(1)) Expect(len(iplds.Transactions)).To(Equal(1))
Expect(iplds.Transactions[0]).To(Equal(mockTrxBlock)) Expect(iplds.Transactions[0]).To(Equal(mockTrxBlock.RawData()))
Expect(len(iplds.Receipts)).To(Equal(1)) Expect(len(iplds.Receipts)).To(Equal(1))
Expect(iplds.Receipts[0]).To(Equal(mockReceiptBlock)) Expect(iplds.Receipts[0]).To(Equal(mockReceiptBlock.RawData()))
Expect(len(iplds.StateNodes)).To(Equal(1)) Expect(len(iplds.StateNodes)).To(Equal(1))
stateNode, ok := iplds.StateNodes[common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")] Expect(iplds.StateNodes[0].StateTrieKey).To(Equal(common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")))
Expect(ok).To(BeTrue()) Expect(iplds.StateNodes[0].Leaf).To(BeTrue())
Expect(stateNode).To(Equal(mockStateBlock)) Expect(iplds.StateNodes[0].IPLD).To(Equal(mockStateBlock.RawData()))
Expect(len(iplds.StorageNodes)).To(Equal(1)) Expect(len(iplds.StorageNodes)).To(Equal(2))
storageNodes := iplds.StorageNodes[common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")] for _, storage := range iplds.StorageNodes {
Expect(len(storageNodes)).To(Equal(2)) Expect(storage.Leaf).To(BeTrue())
storageNode1, ok := storageNodes[common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001")] Expect(storage.StateTrieKey).To(Equal(common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")))
Expect(ok).To(BeTrue()) if bytes.Equal(storage.StorageTrieKey.Bytes(), common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes()) {
Expect(storageNode1).To(Equal(mockStorageBlock1)) Expect(storage.IPLD).To(Equal(mockStorageBlock1.RawData()))
storageNode2, ok := storageNodes[common.HexToHash("0000000000000000000000000000000000000000000000000000000000000002")] }
Expect(storageNode2).To(Equal(mockStorageBlock2)) if bytes.Equal(storage.StorageTrieKey.Bytes(), common.HexToHash("0000000000000000000000000000000000000000000000000000000000000002").Bytes()) {
Expect(ok).To(BeTrue()) Expect(storage.IPLD).To(Equal(mockStorageBlock2.RawData()))
}
}
}) })
}) })
}) })

View File

@ -29,12 +29,12 @@ import (
// PayloadConverter is the underlying struct for the Converter interface // PayloadConverter is the underlying struct for the Converter interface
type PayloadConverter struct { type PayloadConverter struct {
PassedStatediffPayload statediff.Payload PassedStatediffPayload statediff.Payload
ReturnIPLDPayload eth.IPLDPayload ReturnIPLDPayload eth.ConvertedPayload
ReturnErr error ReturnErr error
} }
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload // Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) {
stateDiffPayload, ok := payload.(statediff.Payload) stateDiffPayload, ok := payload.(statediff.Payload)
if !ok { if !ok {
return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload)
@ -46,13 +46,13 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame
// IterativePayloadConverter is the underlying struct for the Converter interface // IterativePayloadConverter is the underlying struct for the Converter interface
type IterativePayloadConverter struct { type IterativePayloadConverter struct {
PassedStatediffPayload []statediff.Payload PassedStatediffPayload []statediff.Payload
ReturnIPLDPayload []eth.IPLDPayload ReturnIPLDPayload []eth.ConvertedPayload
ReturnErr error ReturnErr error
iteration int iteration int
} }
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload // Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.ConvertedData, error) {
stateDiffPayload, ok := payload.(statediff.Payload) stateDiffPayload, ok := payload.(statediff.Payload)
if !ok { if !ok {
return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload)

View File

@ -26,16 +26,16 @@ import (
// IPLDPublisher is the underlying struct for the Publisher interface // IPLDPublisher is the underlying struct for the Publisher interface
type IPLDPublisher struct { type IPLDPublisher struct {
PassedIPLDPayload eth.IPLDPayload PassedIPLDPayload eth.ConvertedPayload
ReturnCIDPayload *eth.CIDPayload ReturnCIDPayload *eth.CIDPayload
ReturnErr error ReturnErr error
} }
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(eth.IPLDPayload) ipldPayload, ok := payload.(eth.ConvertedPayload)
if !ok { if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &eth.IPLDPayload{}, payload) return nil, fmt.Errorf("publish expected payload type %T got %T", &eth.ConvertedPayload{}, payload)
} }
pub.PassedIPLDPayload = ipldPayload pub.PassedIPLDPayload = ipldPayload
return pub.ReturnCIDPayload, pub.ReturnErr return pub.ReturnCIDPayload, pub.ReturnErr
@ -43,17 +43,17 @@ func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForI
// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing // IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing
type IterativeIPLDPublisher struct { type IterativeIPLDPublisher struct {
PassedIPLDPayload []eth.IPLDPayload PassedIPLDPayload []eth.ConvertedPayload
ReturnCIDPayload []*eth.CIDPayload ReturnCIDPayload []*eth.CIDPayload
ReturnErr error ReturnErr error
iteration int iteration int
} }
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IterativeIPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { func (pub *IterativeIPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(eth.IPLDPayload) ipldPayload, ok := payload.(eth.ConvertedPayload)
if !ok { if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &eth.IPLDPayload{}, payload) return nil, fmt.Errorf("publish expected payload type %T got %T", &eth.ConvertedPayload{}, payload)
} }
pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload) pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload)
if len(pub.ReturnCIDPayload) < pub.iteration+1 { if len(pub.ReturnCIDPayload) < pub.iteration+1 {

View File

@ -23,6 +23,9 @@ import (
"math/big" "math/big"
rand2 "math/rand" rand2 "math/rand"
"github.com/multiformats/go-multihash"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -233,7 +236,7 @@ var (
TotalDifficulty: big.NewInt(1337), TotalDifficulty: big.NewInt(1337),
} }
MockIPLDPayload = eth.IPLDPayload{ MockConvertedPayload = eth.ConvertedPayload{
TotalDifficulty: big.NewInt(1337), TotalDifficulty: big.NewInt(1337),
Block: MockBlock, Block: MockBlock,
Receipts: MockReceipts, Receipts: MockReceipts,
@ -293,44 +296,55 @@ var (
}, },
}, },
} }
headerCID, _ = ipld.RawdataToCid(ipld.MEthHeader, MockHeaderRlp, multihash.KECCAK_256)
trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(0), multihash.KECCAK_256)
trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(1), multihash.KECCAK_256)
rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(0), multihash.KECCAK_256)
rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(1), multihash.KECCAK_256)
state1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, ValueBytes, multihash.KECCAK_256)
state2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, AnotherValueBytes, multihash.KECCAK_256)
storageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, StorageValue, multihash.KECCAK_256)
MockIPLDWrapper = &eth.IPLDWrapper{ HeaderIPLD, _ = blocks.NewBlockWithCid(MockHeaderRlp, headerCID)
Trx1IPLD, _ = blocks.NewBlockWithCid(MockTransactions.GetRlp(0), trx1CID)
Trx2IPLD, _ = blocks.NewBlockWithCid(MockTransactions.GetRlp(1), trx2CID)
Rct1IPLD, _ = blocks.NewBlockWithCid(MockReceipts.GetRlp(0), rct1CID)
Rct2IPLD, _ = blocks.NewBlockWithCid(MockReceipts.GetRlp(1), rct2CID)
State1IPLD, _ = blocks.NewBlockWithCid(ValueBytes, state1CID)
State2IPLD, _ = blocks.NewBlockWithCid(AnotherValueBytes, state2CID)
StorageIPLD, _ = blocks.NewBlockWithCid(StorageValue, storageCID)
MockIPLDs = eth.IPLDs{
BlockNumber: big.NewInt(1), BlockNumber: big.NewInt(1),
Headers: []blocks.Block{ Headers: [][]byte{
blocks.NewBlock(MockHeaderRlp), HeaderIPLD.RawData(),
}, },
Transactions: []blocks.Block{ Transactions: [][]byte{
blocks.NewBlock(MockTransactions.GetRlp(0)), Trx1IPLD.RawData(),
blocks.NewBlock(MockTransactions.GetRlp(1)), Trx2IPLD.RawData(),
}, },
Receipts: []blocks.Block{ Receipts: [][]byte{
blocks.NewBlock(MockReceipts.GetRlp(0)), Rct1IPLD.RawData(),
blocks.NewBlock(MockReceipts.GetRlp(1)), Rct2IPLD.RawData(),
}, },
StateNodes: map[common.Hash]blocks.Block{ StateNodes: []eth2.StateNode{
ContractLeafKey: blocks.NewBlock(ValueBytes), {
AnotherContractLeafKey: blocks.NewBlock(AnotherValueBytes), StateTrieKey: ContractLeafKey,
}, Leaf: true,
StorageNodes: map[common.Hash]map[common.Hash]blocks.Block{ IPLD: State1IPLD.RawData(),
ContractLeafKey: { },
common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"): blocks.NewBlock(StorageValue), {
StateTrieKey: AnotherContractLeafKey,
Leaf: true,
IPLD: State2IPLD.RawData(),
}, },
}, },
} StorageNodes: []eth2.StorageNode{
{
MockSeedNodePayload = eth2.StreamResponse{ StateTrieKey: ContractLeafKey,
BlockNumber: big.NewInt(1), StorageTrieKey: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"),
HeadersRlp: [][]byte{MockHeaderRlp}, Leaf: true,
UnclesRlp: [][]byte{}, IPLD: StorageIPLD.RawData(),
TransactionsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)},
ReceiptsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)},
StateNodesRlp: map[common.Hash][]byte{
ContractLeafKey: ValueBytes,
AnotherContractLeafKey: AnotherValueBytes,
},
StorageNodesRlp: map[common.Hash]map[common.Hash][]byte{
ContractLeafKey: {
common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"): StorageValue,
}, },
}, },
} }

View File

@ -53,10 +53,10 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) {
} }
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(IPLDPayload) ipldPayload, ok := payload.(ConvertedPayload)
if !ok { if !ok {
return nil, fmt.Errorf("eth publisher expected payload type %T got %T", IPLDPayload{}, payload) return nil, fmt.Errorf("eth publisher expected payload type %T got %T", ConvertedPayload{}, payload)
} }
// Process and publish headers // Process and publish headers
headerCid, err := pub.publishHeader(ipldPayload.Block.Header()) headerCid, err := pub.publishHeader(ipldPayload.Block.Header())

View File

@ -48,8 +48,8 @@ var _ = Describe("Publisher", func() {
mockHeaderDagPutter.CIDsToReturn = []string{"mockHeaderCID"} mockHeaderDagPutter.CIDsToReturn = []string{"mockHeaderCID"}
mockTrxDagPutter.CIDsToReturn = []string{"mockTrxCID1", "mockTrxCID2"} mockTrxDagPutter.CIDsToReturn = []string{"mockTrxCID1", "mockTrxCID2"}
mockRctDagPutter.CIDsToReturn = []string{"mockRctCID1", "mockRctCID2"} mockRctDagPutter.CIDsToReturn = []string{"mockRctCID1", "mockRctCID2"}
val1 := common.BytesToHash(mocks.MockIPLDPayload.StateNodes[0].Value) val1 := common.BytesToHash(mocks.MockConvertedPayload.StateNodes[0].Value)
val2 := common.BytesToHash(mocks.MockIPLDPayload.StateNodes[1].Value) val2 := common.BytesToHash(mocks.MockConvertedPayload.StateNodes[1].Value)
mockStateDagPutter.CIDsToReturn = map[common.Hash][]string{ mockStateDagPutter.CIDsToReturn = map[common.Hash][]string{
val1: {"mockStateCID1"}, val1: {"mockStateCID1"},
val2: {"mockStateCID2"}, val2: {"mockStateCID2"},
@ -62,11 +62,11 @@ var _ = Describe("Publisher", func() {
StatePutter: mockStateDagPutter, StatePutter: mockStateDagPutter,
StoragePutter: mockStorageDagPutter, StoragePutter: mockStorageDagPutter,
} }
payload, err := publisher.Publish(mocks.MockIPLDPayload) payload, err := publisher.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
cidPayload, ok := payload.(*eth.CIDPayload) cidPayload, ok := payload.(*eth.CIDPayload)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(cidPayload.HeaderCID.TotalDifficulty).To(Equal(mocks.MockIPLDPayload.TotalDifficulty.String())) Expect(cidPayload.HeaderCID.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty.String()))
Expect(cidPayload.HeaderCID.BlockNumber).To(Equal(mocks.MockCIDPayload.HeaderCID.BlockNumber)) Expect(cidPayload.HeaderCID.BlockNumber).To(Equal(mocks.MockCIDPayload.HeaderCID.BlockNumber))
Expect(cidPayload.HeaderCID.BlockHash).To(Equal(mocks.MockCIDPayload.HeaderCID.BlockHash)) Expect(cidPayload.HeaderCID.BlockHash).To(Equal(mocks.MockCIDPayload.HeaderCID.BlockHash))
Expect(cidPayload.UncleCIDs).To(Equal(mocks.MockCIDPayload.UncleCIDs)) Expect(cidPayload.UncleCIDs).To(Equal(mocks.MockCIDPayload.UncleCIDs))

View File

@ -1,78 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-block-format"
)
// IPLDResolver satisfies the IPLDResolver interface for ethereum
type IPLDResolver struct{}
// NewIPLDResolver returns a pointer to an IPLDResolver which satisfies the IPLDResolver interface
func NewIPLDResolver() *IPLDResolver {
return &IPLDResolver{}
}
// Resolve is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper
func (eir *IPLDResolver) Resolve(iplds shared.FetchedIPLDs) (shared.ServerResponse, error) {
ipfsBlocks, ok := iplds.(*IPLDWrapper)
if !ok {
return StreamResponse{}, fmt.Errorf("eth resolver expected iplds type %T got %T", &IPLDWrapper{}, iplds)
}
return StreamResponse{
BlockNumber: ipfsBlocks.BlockNumber,
HeadersRlp: eir.resolve(ipfsBlocks.Headers),
UnclesRlp: eir.resolve(ipfsBlocks.Uncles),
TransactionsRlp: eir.resolve(ipfsBlocks.Transactions),
ReceiptsRlp: eir.resolve(ipfsBlocks.Receipts),
StateNodesRlp: eir.resolveState(ipfsBlocks.StateNodes),
StorageNodesRlp: eir.resolveStorage(ipfsBlocks.StorageNodes),
}, nil
}
func (eir *IPLDResolver) resolve(iplds []blocks.Block) [][]byte {
rlps := make([][]byte, 0, len(iplds))
for _, ipld := range iplds {
rlps = append(rlps, ipld.RawData())
}
return rlps
}
func (eir *IPLDResolver) resolveState(iplds map[common.Hash]blocks.Block) map[common.Hash][]byte {
stateNodes := make(map[common.Hash][]byte, len(iplds))
for key, ipld := range iplds {
stateNodes[key] = ipld.RawData()
}
return stateNodes
}
func (eir *IPLDResolver) resolveStorage(iplds map[common.Hash]map[common.Hash]blocks.Block) map[common.Hash]map[common.Hash][]byte {
storageNodes := make(map[common.Hash]map[common.Hash][]byte)
for stateKey, storageIPLDs := range iplds {
storageNodes[stateKey] = make(map[common.Hash][]byte)
for storageKey, storageVal := range storageIPLDs {
storageNodes[stateKey][storageKey] = storageVal.RawData()
}
}
return storageNodes
}

View File

@ -1,55 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks"
)
var (
resolver *eth.IPLDResolver
)
var _ = Describe("Resolver", func() {
Describe("ResolveIPLDs", func() {
BeforeEach(func() {
resolver = eth.NewIPLDResolver()
})
It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() {
payload, err := resolver.Resolve(mocks.MockIPLDWrapper)
Expect(err).ToNot(HaveOccurred())
superNodePayload, ok := payload.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeedNodePayload.HeadersRlp))
Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeedNodePayload.UnclesRlp))
Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2))
Expect(shared.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
Expect(shared.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())
Expect(len(superNodePayload.ReceiptsRlp)).To(Equal(2))
Expect(shared.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(0))).To(BeTrue())
Expect(shared.ListContainsBytes(superNodePayload.ReceiptsRlp, mocks.MockReceipts.GetRlp(1))).To(BeTrue())
Expect(len(superNodePayload.StateNodesRlp)).To(Equal(2))
Expect(superNodePayload.StorageNodesRlp).To(Equal(mocks.MockSeedNodePayload.StorageNodesRlp))
})
})
})

View File

@ -17,18 +17,16 @@
package eth package eth
import ( import (
"encoding/json"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ipfs/go-block-format"
) )
// IPLDPayload is a custom type which packages raw ETH data for publishing to IPFS and filtering to subscribers // ConvertedPayload is a custom type which packages raw ETH data for publishing to IPFS and filtering to subscribers
// Returned by PayloadConverter // Returned by PayloadConverter
// Passed to IPLDPublisher and ResponseFilterer // Passed to IPLDPublisher and ResponseFilterer
type IPLDPayload struct { type ConvertedPayload struct {
TotalDifficulty *big.Int TotalDifficulty *big.Int
Block *types.Block Block *types.Block
TxMetaData []TxModel TxMetaData []TxModel
@ -39,7 +37,7 @@ type IPLDPayload struct {
} }
// Height satisfies the StreamedIPLDs interface // Height satisfies the StreamedIPLDs interface
func (i IPLDPayload) Height() int64 { func (i ConvertedPayload) Height() int64 {
return i.Block.Number().Int64() return i.Block.Number().Int64()
} }
@ -75,54 +73,32 @@ type CIDWrapper struct {
StorageNodes []StorageNodeWithStateKeyModel StorageNodes []StorageNodeWithStateKeyModel
} }
// IPLDWrapper is used to package raw IPLD block data fetched from IPFS // IPLDs is used to package raw IPLD block data fetched from IPFS and returned by the server
// Returned by IPLDFetcher // Returned by IPLDFetcher and ResponseFilterer
// Passed to IPLDResolver type IPLDs struct {
type IPLDWrapper struct {
BlockNumber *big.Int BlockNumber *big.Int
Headers []blocks.Block Headers [][]byte
Uncles []blocks.Block Uncles [][]byte
Transactions []blocks.Block Transactions [][]byte
Receipts []blocks.Block Receipts [][]byte
StateNodes map[common.Hash]blocks.Block StateNodes []StateNode
StorageNodes map[common.Hash]map[common.Hash]blocks.Block StorageNodes []StorageNode
} }
// StreamResponse holds the data streamed from the super node eth service to the requesting clients // Height satisfies the StreamedIPLDs interface
// Returned by IPLDResolver and ResponseFilterer func (i IPLDs) Height() int64 {
// Passed to client subscriptions return i.BlockNumber.Int64()
type StreamResponse struct {
BlockNumber *big.Int `json:"blockNumber"`
HeadersRlp [][]byte `json:"headersRlp"`
UnclesRlp [][]byte `json:"unclesRlp"`
TransactionsRlp [][]byte `json:"transactionsRlp"`
ReceiptsRlp [][]byte `json:"receiptsRlp"`
StateNodesRlp map[common.Hash][]byte `json:"stateNodesRlp"`
StorageNodesRlp map[common.Hash]map[common.Hash][]byte `json:"storageNodesRlp"`
encoded []byte
err error
} }
// Height satisfies the ServerResponse interface type StateNode struct {
func (sr StreamResponse) Height() int64 { StateTrieKey common.Hash
return sr.BlockNumber.Int64() IPLD []byte
Leaf bool
} }
func (sr *StreamResponse) ensureEncoded() { type StorageNode struct {
if sr.encoded == nil && sr.err == nil { StateTrieKey common.Hash
sr.encoded, sr.err = json.Marshal(sr) StorageTrieKey common.Hash
} IPLD []byte
} Leaf bool
// Length to implement Encoder interface for StateDiff
func (sr *StreamResponse) Length() int {
sr.ensureEncoded()
return len(sr.encoded)
}
// Encode to implement Encoder interface for StateDiff
func (sr *StreamResponse) Encode() ([]byte, error) {
sr.ensureEncoded()
return sr.encoded, sr.err
} }

View File

@ -44,9 +44,9 @@ type SuperNode interface {
// APIs(), Protocols(), Start() and Stop() // APIs(), Protocols(), Start() and Stop()
node.Service node.Service
// Data processing event loop // Data processing event loop
ProcessData(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs) error ProcessData(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error
// Pub-Sub handling event loop // Pub-Sub handling event loop
FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData)
// Method to subscribe to the service // Method to subscribe to the service
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings)
// Method to unsubscribe from the service // Method to unsubscribe from the service
@ -73,8 +73,6 @@ type Service struct {
IPLDFetcher shared.IPLDFetcher IPLDFetcher shared.IPLDFetcher
// Interface for searching and retrieving CIDs from Postgres index // Interface for searching and retrieving CIDs from Postgres index
Retriever shared.CIDRetriever Retriever shared.CIDRetriever
// Interface for resolving IPLDs to their data types
Resolver shared.IPLDResolver
// Chan the processor uses to subscribe to payloads from the Streamer // Chan the processor uses to subscribe to payloads from the Streamer
PayloadChan chan shared.RawChainData PayloadChan chan shared.RawChainData
// Used to signal shutdown of the service // Used to signal shutdown of the service
@ -132,10 +130,6 @@ func NewSuperNode(settings *shared.SuperNodeConfig) (SuperNode, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
sn.Resolver, err = NewIPLDResolver(settings.Chain)
if err != nil {
return nil, err
}
} }
sn.QuitChan = settings.Quit sn.QuitChan = settings.Quit
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
@ -175,7 +169,7 @@ func (sap *Service) APIs() []rpc.API {
// It forwards the converted data to the publishAndIndex process(es) it spins up // It forwards the converted data to the publishAndIndex process(es) it spins up
// If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel // If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel
// This continues on no matter if or how many subscribers there are // This continues on no matter if or how many subscribers there are
func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs) error { func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error {
sub, err := sap.Streamer.Stream(sap.PayloadChan) sub, err := sap.Streamer.Stream(sap.PayloadChan)
if err != nil { if err != nil {
return err return err
@ -183,7 +177,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<-
wg.Add(1) wg.Add(1)
// Channels for forwarding data to the publishAndIndex workers // Channels for forwarding data to the publishAndIndex workers
publishAndIndexPayload := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize)
// publishAndIndex worker pool to handle publishing and indexing concurrently, while // publishAndIndex worker pool to handle publishing and indexing concurrently, while
// limiting the number of Postgres connections we can possibly open so as to prevent error // limiting the number of Postgres connections we can possibly open so as to prevent error
for i := 0; i < sap.WorkerPoolSize; i++ { for i := 0; i < sap.WorkerPoolSize; i++ {
@ -220,7 +214,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<-
// publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process // publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process
// it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres // it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres
func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.StreamedIPLDs) { func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.ConvertedData) {
go func() { go func() {
for { for {
select { select {
@ -243,7 +237,7 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared
// It filters and sends this data to any subscribers to the service // It filters and sends this data to any subscribers to the service
// This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process // This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) { func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) {
wg.Add(1) wg.Add(1)
go func() { go func() {
for { for {
@ -261,7 +255,7 @@ func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-c
} }
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions // filterAndServe filters the payload according to each subscription type and sends to the subscriptions
func (sap *Service) filterAndServe(payload shared.StreamedIPLDs) { func (sap *Service) filterAndServe(payload shared.ConvertedData) {
log.Debugf("Sending payload to subscriptions") log.Debugf("Sending payload to subscriptions")
sap.Lock() sap.Lock()
for ty, subs := range sap.Subscriptions { for ty, subs := range sap.Subscriptions {
@ -284,9 +278,14 @@ func (sap *Service) filterAndServe(payload shared.StreamedIPLDs) {
sap.closeType(ty) sap.closeType(ty)
continue continue
} }
responseRLP, err := rlp.EncodeToBytes(response)
if err != nil {
log.Error(err)
continue
}
for id, sub := range subs { for id, sub := range subs {
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: response, Err: "", Flag: EmptyFlag}: case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}:
log.Debugf("sending super node payload to subscription %s", id) log.Debugf("sending super node payload to subscription %s", id)
default: default:
log.Infof("unable to send payload to subscription %s; channel has no receiver", id) log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
@ -372,18 +371,18 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
if empty { if empty {
continue continue
} }
blocksWrapper, err := sap.IPLDFetcher.Fetch(cidWrapper) response, err := sap.IPLDFetcher.Fetch(cidWrapper)
if err != nil { if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("IPLD Fetching error at block %d\r%s", i, err.Error())) sendNonBlockingErr(sub, fmt.Errorf("IPLD Fetching error at block %d\r%s", i, err.Error()))
continue continue
} }
backFillIplds, err := sap.Resolver.Resolve(blocksWrapper) responseRLP, err := rlp.EncodeToBytes(response)
if err != nil { if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("IPLD Resolving error at block %d\r%s", i, err.Error())) log.Error(err)
continue continue
} }
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: backFillIplds, Err: "", Flag: EmptyFlag}: case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}:
log.Debugf("sending super node historical data payload to subscription %s", id) log.Debugf("sending super node historical data payload to subscription %s", id)
default: default:
log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id)
@ -420,7 +419,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) {
func (sap *Service) Start(*p2p.Server) error { func (sap *Service) Start(*p2p.Server) error {
log.Info("Starting super node service") log.Info("Starting super node service")
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
payloadChan := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize)
if err := sap.ProcessData(wg, payloadChan); err != nil { if err := sap.ProcessData(wg, payloadChan); err != nil {
return err return err
} }

View File

@ -51,7 +51,7 @@ var _ = Describe("Service", func() {
ReturnErr: nil, ReturnErr: nil,
} }
mockConverter := &mocks.PayloadConverter{ mockConverter := &mocks.PayloadConverter{
ReturnIPLDPayload: mocks.MockIPLDPayload, ReturnIPLDPayload: mocks.MockConvertedPayload,
ReturnErr: nil, ReturnErr: nil,
} }
processor := &super_node.Service{ processor := &super_node.Service{
@ -71,7 +71,7 @@ var _ = Describe("Service", func() {
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload))
Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1)) Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1))
Expect(mockCidIndexer.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(mockCidIndexer.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockIPLDPayload)) Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockConvertedPayload))
Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan))
}) })
}) })

View File

@ -32,12 +32,12 @@ type PayloadFetcher interface {
// PayloadConverter converts chain-specific payloads into IPLD payloads for publishing // PayloadConverter converts chain-specific payloads into IPLD payloads for publishing
type PayloadConverter interface { type PayloadConverter interface {
Convert(payload RawChainData) (StreamedIPLDs, error) Convert(payload RawChainData) (ConvertedData, error)
} }
// IPLDPublisher publishes IPLD payloads and returns a CID payload for indexing // IPLDPublisher publishes IPLD payloads and returns a CID payload for indexing
type IPLDPublisher interface { type IPLDPublisher interface {
Publish(payload StreamedIPLDs) (CIDsForIndexing, error) Publish(payload ConvertedData) (CIDsForIndexing, error)
} }
// CIDIndexer indexes a CID payload in Postgres // CIDIndexer indexes a CID payload in Postgres
@ -47,7 +47,7 @@ type CIDIndexer interface {
// ResponseFilterer applies a filter to an IPLD payload to return a subscription response packet // ResponseFilterer applies a filter to an IPLD payload to return a subscription response packet
type ResponseFilterer interface { type ResponseFilterer interface {
Filter(filter SubscriptionSettings, payload StreamedIPLDs) (response ServerResponse, err error) Filter(filter SubscriptionSettings, payload ConvertedData) (response IPLDs, err error)
} }
// CIDRetriever retrieves cids according to a provided filter and returns a CID wrapper // CIDRetriever retrieves cids according to a provided filter and returns a CID wrapper
@ -60,12 +60,7 @@ type CIDRetriever interface {
// IPLDFetcher uses a CID wrapper to fetch an IPLD wrapper // IPLDFetcher uses a CID wrapper to fetch an IPLD wrapper
type IPLDFetcher interface { type IPLDFetcher interface {
Fetch(cids CIDsForFetching) (FetchedIPLDs, error) Fetch(cids CIDsForFetching) (IPLDs, error)
}
// IPLDResolver resolves an IPLD wrapper into chain-specific payloads
type IPLDResolver interface {
Resolve(iplds FetchedIPLDs) (ServerResponse, error)
} }
// ClientSubscription is a general interface for chain data subscriptions // ClientSubscription is a general interface for chain data subscriptions

View File

@ -23,8 +23,8 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
// IPLDFetcher mock for tests // PayloadFetcher mock for tests
type IPLDFetcher struct { type PayloadFetcher struct {
PayloadsToReturn map[uint64]shared.RawChainData PayloadsToReturn map[uint64]shared.RawChainData
FetchErrs map[uint64]error FetchErrs map[uint64]error
CalledAtBlockHeights [][]uint64 CalledAtBlockHeights [][]uint64
@ -32,7 +32,7 @@ type IPLDFetcher struct {
} }
// FetchAt mock method // FetchAt mock method
func (fetcher *IPLDFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
if fetcher.PayloadsToReturn == nil { if fetcher.PayloadsToReturn == nil {
return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return") return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return")
} }

View File

@ -21,8 +21,8 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
// MockCIDRetriever is a mock CID retriever for use in tests // CIDRetriever is a mock CID retriever for use in tests
type MockCIDRetriever struct { type CIDRetriever struct {
GapsToRetrieve []shared.Gap GapsToRetrieve []shared.Gap
GapsToRetrieveErr error GapsToRetrieveErr error
CalledTimes int CalledTimes int
@ -31,34 +31,34 @@ type MockCIDRetriever struct {
} }
// RetrieveCIDs mock method // RetrieveCIDs mock method
func (*MockCIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { func (*CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) {
panic("implement me") panic("implement me")
} }
// RetrieveLastBlockNumber mock method // RetrieveLastBlockNumber mock method
func (*MockCIDRetriever) RetrieveLastBlockNumber() (int64, error) { func (*CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
panic("implement me") panic("implement me")
} }
// RetrieveFirstBlockNumber mock method // RetrieveFirstBlockNumber mock method
func (mcr *MockCIDRetriever) RetrieveFirstBlockNumber() (int64, error) { func (mcr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) {
return mcr.FirstBlockNumberToReturn, mcr.RetrieveFirstBlockNumberErr return mcr.FirstBlockNumberToReturn, mcr.RetrieveFirstBlockNumberErr
} }
// RetrieveGapsInData mock method // RetrieveGapsInData mock method
func (mcr *MockCIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) { func (mcr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) {
mcr.CalledTimes++ mcr.CalledTimes++
return mcr.GapsToRetrieve, mcr.GapsToRetrieveErr return mcr.GapsToRetrieve, mcr.GapsToRetrieveErr
} }
// SetGapsToRetrieve mock method // SetGapsToRetrieve mock method
func (mcr *MockCIDRetriever) SetGapsToRetrieve(gaps []shared.Gap) { func (mcr *CIDRetriever) SetGapsToRetrieve(gaps []shared.Gap) {
if mcr.GapsToRetrieve == nil { if mcr.GapsToRetrieve == nil {
mcr.GapsToRetrieve = make([]shared.Gap, 0) mcr.GapsToRetrieve = make([]shared.Gap, 0)
} }
mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...) mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...)
} }
func (mcr *MockCIDRetriever) Database() *postgres.DB { func (mcr *CIDRetriever) Database() *postgres.DB {
panic("implement me") panic("implement me")
} }

View File

@ -20,7 +20,7 @@ package shared
type RawChainData interface{} type RawChainData interface{}
// The concrete type underneath StreamedIPLDs should not be a pointer // The concrete type underneath StreamedIPLDs should not be a pointer
type StreamedIPLDs interface { type ConvertedData interface {
Height() int64 Height() int64
} }
@ -28,10 +28,7 @@ type CIDsForIndexing interface{}
type CIDsForFetching interface{} type CIDsForFetching interface{}
type FetchedIPLDs interface{} type IPLDs interface {
// The concrete type underneath StreamedIPLDs should not be a pointer
type ServerResponse interface {
Height() int64 Height() int64
} }

View File

@ -20,7 +20,6 @@ import (
"errors" "errors"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
type Flag int32 type Flag int32
@ -40,9 +39,10 @@ type Subscription struct {
// SubscriptionPayload is the struct for a super node stream payload // SubscriptionPayload is the struct for a super node stream payload
// It carries data of a type specific to the chain being supported/queried and an error message // It carries data of a type specific to the chain being supported/queried and an error message
type SubscriptionPayload struct { type SubscriptionPayload struct {
Data shared.ServerResponse `json:"data"` // e.g. for Ethereum eth.StreamPayload Data []byte `json:"data"` // e.g. for Ethereum rlp serialized eth.StreamPayload
Err string `json:"err"` // field for error Height int64 `json:"height"`
Flag Flag `json:"flag"` // field for message Err string `json:"err"` // field for error
Flag Flag `json:"flag"` // field for message
} }
func (sp SubscriptionPayload) Error() error { func (sp SubscriptionPayload) Error() error {

View File

@ -24,7 +24,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/postgres"
shared2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" shared2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared" "github.com/vulcanize/vulcanizedb/pkg/watcher/shared"
) )
// NewSuperNodeStreamer returns a new shared.SuperNodeStreamer // NewSuperNodeStreamer returns a new shared.SuperNodeStreamer

View File

@ -17,9 +17,11 @@
package eth package eth
import ( import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node" "github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared" "github.com/vulcanize/vulcanizedb/pkg/watcher/shared"
) )
// Repository is the underlying struct for satisfying the shared.Repository interface for eth // Repository is the underlying struct for satisfying the shared.Repository interface for eth
@ -56,3 +58,27 @@ func (r *Repository) GetQueueData(height int64, hash string) (super_node.Subscri
func (r *Repository) ReadyData(payload super_node.SubscriptionPayload) error { func (r *Repository) ReadyData(payload super_node.SubscriptionPayload) error {
panic("implement me") panic("implement me")
} }
func (r *Repository) readyHeader(header *types.Header) error {
panic("implement me")
}
func (r *Repository) readyUncle(uncle *types.Header) error {
panic("implement me")
}
func (r *Repository) readyTxs(transactions types.Transactions) error {
panic("implement me")
}
func (r *Repository) readyRcts(receipts types.Receipts) error {
panic("implement me")
}
func (r *Repository) readyState(stateNodes map[common.Address][]byte) error {
panic("implement me")
}
func (r *Repository) readyStorage(storageNodes map[common.Address]map[common.Address][]byte) error {
panic("implement me")
}

View File

@ -24,12 +24,12 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/super_node" "github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared"
"github.com/vulcanize/vulcanizedb/pkg/wasm" "github.com/vulcanize/vulcanizedb/pkg/wasm"
"github.com/vulcanize/vulcanizedb/pkg/watcher/shared"
) )
// SuperNodeWatcher is the top level interface for watching data from super node // Watcher is the top level interface for watching data from super node
type SuperNodeWatcher interface { type Watcher interface {
Init() error Init() error
Watch(wg *sync.WaitGroup) error Watch(wg *sync.WaitGroup) error
} }
@ -56,8 +56,8 @@ type Service struct {
backFilling *int32 // 0 => not backfilling; 1 => backfilling backFilling *int32 // 0 => not backfilling; 1 => backfilling
} }
// NewSuperNodeWatcher returns a new Service which satisfies the SuperNodeWatcher interface // NewWatcher returns a new Service which satisfies the Watcher interface
func NewSuperNodeWatcher(c Config, quitChan chan bool) (SuperNodeWatcher, error) { func NewWatcher(c Config, quitChan chan bool) (Watcher, error) {
repo, err := NewRepository(c.SubscriptionConfig.ChainType(), c.DB, c.TriggerFunctions) repo, err := NewRepository(c.SubscriptionConfig.ChainType(), c.DB, c.TriggerFunctions)
if err != nil { if err != nil {
return nil, err return nil, err
@ -119,7 +119,7 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio
logrus.Error(payload.Error()) logrus.Error(payload.Error())
continue continue
} }
if payload.Data.Height() == atomic.LoadInt64(s.payloadIndex) { if payload.Height == atomic.LoadInt64(s.payloadIndex) {
// If the data is at our current index it is ready to be processed; add it to the ready data queue // If the data is at our current index it is ready to be processed; add it to the ready data queue
if err := s.Repository.ReadyData(payload); err != nil { if err := s.Repository.ReadyData(payload); err != nil {
logrus.Error(err) logrus.Error(err)
@ -133,7 +133,7 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio
case err := <-sub.Err(): case err := <-sub.Err():
logrus.Error(err) logrus.Error(err)
case <-s.QuitChan: case <-s.QuitChan:
logrus.Info("WatchContract shutting down") logrus.Info("Watcher shutting down")
wg.Done() wg.Done()
return return
} }
@ -168,7 +168,7 @@ func (s *Service) backFillOnlyQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscri
case err := <-sub.Err(): case err := <-sub.Err():
logrus.Error(err) logrus.Error(err)
case <-s.QuitChan: case <-s.QuitChan:
logrus.Info("WatchContract shutting down") logrus.Info("Watcher shutting down")
wg.Done() wg.Done()
return return
} }