diff --git a/go.mod b/go.mod index e3ef318a..20d6c44b 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 github.com/bren2010/proquint v0.0.0-20160323162903-38337c27106d github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8 + github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/btcsuite/goleveldb v1.0.0 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cenkalti/backoff/v3 v3.0.0 diff --git a/go.sum b/go.sum index a9686920..a35cf9e8 100644 --- a/go.sum +++ b/go.sum @@ -47,14 +47,18 @@ github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcug github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8 h1:mOg8/RgDSHTQ1R0IR+LMDuW4TDShPv+JzYHuR4GLoNA= github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= +github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= +github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d h1:yJzD/yFppdVCf6ApMkVy8cUxV0XrxdP9rVf6D87/Mng= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= +github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I= github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= +github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8= diff --git a/pkg/super_node/btc/streamer.go b/pkg/super_node/btc/streamer.go new file mode 100644 index 00000000..cd3960f5 --- /dev/null +++ b/pkg/super_node/btc/streamer.go @@ -0,0 +1,86 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package btc + +import ( + "github.com/btcsuite/btcd/rpcclient" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +const ( + PayloadChanBufferSize = 20000 // the max eth sub buffer size +) + +// PayloadStreamer satisfies the PayloadStreamer interface for bitcoin +type PayloadStreamer struct { + Config *rpcclient.ConnConfig +} + +// NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for bitcoin +func NewPayloadStreamer(clientConfig *rpcclient.ConnConfig) *PayloadStreamer { + return &PayloadStreamer{ + Config: clientConfig, + } +} + +// Stream is the main loop for subscribing to data from the btc block notifications +// Satisfies the shared.PayloadStreamer interface +func (ps *PayloadStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) { + logrus.Info("streaming block payloads from btc") + blockNotificationHandler := rpcclient.NotificationHandlers{ + // Notification handler for block connections, forwards new block data to the payloadChan + OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txs []*btcutil.Tx) { + payloadChan <- BlockPayload{ + Height: height, + Header: header, + Txs: txs, + } + }, + } + // Create a new client, and connect to btc ws server + client, err := rpcclient.New(ps.Config, &blockNotificationHandler) + if err != nil { + return nil, err + } + // Register for block connect notifications. + if err := client.NotifyBlocks(); err != nil { + return nil, err + } + client.WaitForShutdown() + return &ClientSubscription{client: client}, nil +} + +// ClientSubscription is a wrapper around the underlying btcd rpc client +// to fit the shared.ClientSubscription interface +type ClientSubscription struct { + client *rpcclient.Client +} + +// Unsubscribe satisfies the rpc.Subscription interface +func (bcs *ClientSubscription) Unsubscribe() { + bcs.client.Shutdown() +} + +// Err() satisfies the rpc.Subscription interface with a dummy err channel +func (bcs *ClientSubscription) Err() <-chan error { + errChan := make(chan error) + return errChan +} diff --git a/pkg/super_node/btc/types.go b/pkg/super_node/btc/types.go new file mode 100644 index 00000000..5ff9df42 --- /dev/null +++ b/pkg/super_node/btc/types.go @@ -0,0 +1,29 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package btc + +import ( + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +// BlockPayload packages the block and tx data received from block connection notifications +type BlockPayload struct { + Height int32 + Header *wire.BlockHeader + Txs []*btcutil.Tx +} \ No newline at end of file diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index ee384fd8..f961b88c 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -18,6 +18,8 @@ package super_node import ( "fmt" + "github.com/btcsuite/btcd/rpcclient" + "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" @@ -67,10 +69,17 @@ func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.Payl ethClient, ok := client.(core.RPCClient) if !ok { var expectedClientType core.RPCClient - return nil, nil, fmt.Errorf("ethereum payload constructor expected client type %T got %T", expectedClientType, client) + return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", expectedClientType, client) } streamChan := make(chan interface{}, eth.PayloadChanBufferSize) return eth.NewPayloadStreamer(ethClient), streamChan, nil + case config.Bitcoin: + btcClientConn, ok := client.(*rpcclient.ConnConfig) + if !ok { + return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client config type %T got %T", rpcclient.ConnConfig{}, client) + } + streamChan := make(chan interface{}, btc.PayloadChanBufferSize) + return btc.NewPayloadStreamer(btcClientConn), streamChan, nil default: return nil, nil, fmt.Errorf("invalid chain %T for streamer constructor", chain) } diff --git a/pkg/super_node/eth/converter.go b/pkg/super_node/eth/converter.go index dbd35045..e86b7fd1 100644 --- a/pkg/super_node/eth/converter.go +++ b/pkg/super_node/eth/converter.go @@ -31,14 +31,15 @@ type PayloadConverter struct { chainConfig *params.ChainConfig } -// NewPayloadConverter creates a pointer to a new Converter which satisfies the PayloadConverter interface +// NewPayloadConverter creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface func NewPayloadConverter(chainConfig *params.ChainConfig) *PayloadConverter { return &PayloadConverter{ chainConfig: chainConfig, } } -// Convert method is used to convert a geth statediff.Payload to a IPLDPayload +// Convert method is used to convert a eth statediff.Payload to an IPLDPayload +// Satisfies the shared.PayloadConverter interface func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { stateDiffPayload, ok := payload.(statediff.Payload) if !ok { @@ -60,7 +61,7 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { TotalDifficulty: stateDiffPayload.TotalDifficulty, Block: block, HeaderRLP: headerRlp, - TrxMetaData: make([]TxModel, 0, trxLen), + TxMetaData: make([]TxModel, 0, trxLen), Receipts: make(types.Receipts, 0, trxLen), ReceiptMetaData: make([]ReceiptModel, 0, trxLen), StateNodes: make([]TrieNode, 0), @@ -81,7 +82,7 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { Index: int64(i), } // txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody - convertedPayload.TrxMetaData = append(convertedPayload.TrxMetaData, txMeta) + convertedPayload.TxMetaData = append(convertedPayload.TxMetaData, txMeta) } // Decode receipts for this block diff --git a/pkg/super_node/eth/converter_test.go b/pkg/super_node/eth/converter_test.go index a9b04902..cff68b15 100644 --- a/pkg/super_node/eth/converter_test.go +++ b/pkg/super_node/eth/converter_test.go @@ -45,7 +45,7 @@ var _ = Describe("Converter", func() { Expect(err).ToNot(HaveOccurred()) Expect(gotBody).To(Equal(expectedBody)) Expect(convertedPayload.HeaderRLP).To(Equal(mocks.MockHeaderRlp)) - Expect(convertedPayload.TrxMetaData).To(Equal(mocks.MockTrxMeta)) + Expect(convertedPayload.TxMetaData).To(Equal(mocks.MockTrxMeta)) Expect(convertedPayload.ReceiptMetaData).To(Equal(mocks.MockRctMeta)) }) }) diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index 2b71d433..5048c7e7 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -102,7 +102,7 @@ func (s *ResponseFilterer) filterTransactions(trxFilter config.TxFilter, respons trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions)) if !trxFilter.Off { for i, trx := range payload.Block.Body().Transactions { - if checkTransactions(trxFilter.Src, trxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) { + if checkTransactions(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) { trxBuffer := new(bytes.Buffer) if err := trx.EncodeRLP(trxBuffer); err != nil { return nil, err diff --git a/pkg/super_node/eth/mocks/test_data.go b/pkg/super_node/eth/mocks/test_data.go index c17a10d4..a328f25c 100644 --- a/pkg/super_node/eth/mocks/test_data.go +++ b/pkg/super_node/eth/mocks/test_data.go @@ -238,7 +238,7 @@ var ( Block: MockBlock, Receipts: MockReceipts, HeaderRLP: MockHeaderRlp, - TrxMetaData: MockTrxMeta, + TxMetaData: MockTrxMeta, ReceiptMetaData: MockRctMeta, StorageNodes: MockStorageNodes, StateNodes: MockStateNodes, diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 96c6eb7c..128125bd 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -95,7 +95,7 @@ func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) { } // Process and publish transactions - transactionCids, err := pub.publishTransactions(ipldPayload.Block.Body(), ipldPayload.TrxMetaData) + transactionCids, err := pub.publishTransactions(ipldPayload.Block.Body(), ipldPayload.TxMetaData) if err != nil { return nil, err } diff --git a/pkg/super_node/eth/streamer.go b/pkg/super_node/eth/streamer.go index d3ae236a..e0ad3ff7 100644 --- a/pkg/super_node/eth/streamer.go +++ b/pkg/super_node/eth/streamer.go @@ -17,8 +17,8 @@ package eth import ( - "github.com/ethereum/go-ethereum/rpc" "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/eth/core" ) @@ -32,7 +32,7 @@ type PayloadStreamer struct { Client core.RPCClient } -// NewPayloadStreamer creates a pointer to a new StateDiffStreamer which satisfies the PayloadStreamer interface +// NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer { return &PayloadStreamer{ Client: client, @@ -40,7 +40,8 @@ func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer { } // Stream is the main loop for subscribing to data from the Geth state diff process -func (sds *PayloadStreamer) Stream(payloadChan chan interface{}) (*rpc.ClientSubscription, error) { +// Satisfies the shared.PayloadStreamer interface +func (ps *PayloadStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) { logrus.Info("streaming diffs from geth") - return sds.Client.Subscribe("statediff", payloadChan, "stream") + return ps.Client.Subscribe("statediff", payloadChan, "stream") } diff --git a/pkg/super_node/eth/streamer_test.go b/pkg/super_node/eth/streamer_test.go index bd18c434..cc202d5f 100644 --- a/pkg/super_node/eth/streamer_test.go +++ b/pkg/super_node/eth/streamer_test.go @@ -18,7 +18,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/eth/fakes" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" ) diff --git a/pkg/super_node/eth/types.go b/pkg/super_node/eth/types.go index 803d93f1..48d01c12 100644 --- a/pkg/super_node/eth/types.go +++ b/pkg/super_node/eth/types.go @@ -32,7 +32,7 @@ type IPLDPayload struct { TotalDifficulty *big.Int Block *types.Block HeaderRLP []byte - TrxMetaData []TxModel + TxMetaData []TxModel Receipts types.Receipts ReceiptMetaData []ReceiptModel StateNodes []TrieNode diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index a217817b..1364e0be 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -16,21 +16,37 @@ package shared -import ( - "github.com/ethereum/go-ethereum/rpc" -) - -// ResponseFilterer applies a filter to the streamed payload and returns a subscription response packet -type ResponseFilterer interface { - Filter(filter, payload interface{}) (response interface{}, err error) +// PayloadStreamer streams chain-specific payloads to the provided channel +type PayloadStreamer interface { + Stream(payloadChan chan interface{}) (ClientSubscription, error) } -// CIDIndexer indexes a set of cids with their associated meta data in Postgres +// PayloadFetcher fetches chain-specific payloads +type PayloadFetcher interface { + FetchAt(blockHeights []uint64) ([]interface{}, error) +} + +// PayloadConverter converts chain-specific payloads into IPLD payloads for publishing +type PayloadConverter interface { + Convert(payload interface{}) (interface{}, error) +} + +// IPLDPublisher publishes IPLD payloads and returns a CID payload for indexing +type IPLDPublisher interface { + Publish(payload interface{}) (interface{}, error) +} + +// CIDIndexer indexes a CID payload in Postgres type CIDIndexer interface { Index(cids interface{}) error } -// CIDRetriever retrieves cids according to a provided filter and returns a cid +// ResponseFilterer applies a filter to an IPLD payload to return a subscription response packet +type ResponseFilterer interface { + Filter(filter, payload interface{}) (response interface{}, err error) +} + +// CIDRetriever retrieves cids according to a provided filter and returns a CID wrapper type CIDRetriever interface { Retrieve(filter interface{}, blockNumber int64) (interface{}, bool, error) RetrieveFirstBlockNumber() (int64, error) @@ -38,26 +54,18 @@ type CIDRetriever interface { RetrieveGapsInData() ([]Gap, error) } -type PayloadStreamer interface { - Stream(payloadChan chan interface{}) (*rpc.ClientSubscription, error) -} - -type PayloadFetcher interface { - FetchAt(blockHeights []uint64) ([]interface{}, error) -} - +// IPLDFetcher uses a CID wrapper to fetch an IPLD wrapper type IPLDFetcher interface { Fetch(cids interface{}) (interface{}, error) } -type PayloadConverter interface { - Convert(payload interface{}) (interface{}, error) -} - -type IPLDPublisher interface { - Publish(payload interface{}) (interface{}, error) -} - +// IPLDResolver resolves an IPLD wrapper into chain-specific payloads type IPLDResolver interface { Resolve(iplds interface{}) (interface{}, error) } + +// ClientSubscription is a general interface for chain data subscriptions +type ClientSubscription interface { + Err() <-chan error + Unsubscribe() +}