btc payload streamer

This commit is contained in:
Ian Norden 2020-01-29 14:37:20 -06:00
parent da844b0b83
commit 076903b174
14 changed files with 179 additions and 40 deletions

1
go.mod
View File

@ -10,6 +10,7 @@ require (
github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015
github.com/bren2010/proquint v0.0.0-20160323162903-38337c27106d
github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
github.com/btcsuite/goleveldb v1.0.0 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v3 v3.0.0

4
go.sum
View File

@ -47,14 +47,18 @@ github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcug
github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8 h1:mOg8/RgDSHTQ1R0IR+LMDuW4TDShPv+JzYHuR4GLoNA=
github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d h1:yJzD/yFppdVCf6ApMkVy8cUxV0XrxdP9rVf6D87/Mng=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8=

View File

@ -0,0 +1,86 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc
import (
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
const (
PayloadChanBufferSize = 20000 // the max eth sub buffer size
)
// PayloadStreamer satisfies the PayloadStreamer interface for bitcoin
type PayloadStreamer struct {
Config *rpcclient.ConnConfig
}
// NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for bitcoin
func NewPayloadStreamer(clientConfig *rpcclient.ConnConfig) *PayloadStreamer {
return &PayloadStreamer{
Config: clientConfig,
}
}
// Stream is the main loop for subscribing to data from the btc block notifications
// Satisfies the shared.PayloadStreamer interface
func (ps *PayloadStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) {
logrus.Info("streaming block payloads from btc")
blockNotificationHandler := rpcclient.NotificationHandlers{
// Notification handler for block connections, forwards new block data to the payloadChan
OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txs []*btcutil.Tx) {
payloadChan <- BlockPayload{
Height: height,
Header: header,
Txs: txs,
}
},
}
// Create a new client, and connect to btc ws server
client, err := rpcclient.New(ps.Config, &blockNotificationHandler)
if err != nil {
return nil, err
}
// Register for block connect notifications.
if err := client.NotifyBlocks(); err != nil {
return nil, err
}
client.WaitForShutdown()
return &ClientSubscription{client: client}, nil
}
// ClientSubscription is a wrapper around the underlying btcd rpc client
// to fit the shared.ClientSubscription interface
type ClientSubscription struct {
client *rpcclient.Client
}
// Unsubscribe satisfies the rpc.Subscription interface
func (bcs *ClientSubscription) Unsubscribe() {
bcs.client.Shutdown()
}
// Err() satisfies the rpc.Subscription interface with a dummy err channel
func (bcs *ClientSubscription) Err() <-chan error {
errChan := make(chan error)
return errChan
}

View File

@ -0,0 +1,29 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc
import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
)
// BlockPayload packages the block and tx data received from block connection notifications
type BlockPayload struct {
Height int32
Header *wire.BlockHeader
Txs []*btcutil.Tx
}

View File

@ -18,6 +18,8 @@ package super_node
import (
"fmt"
"github.com/btcsuite/btcd/rpcclient"
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
@ -67,10 +69,17 @@ func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.Payl
ethClient, ok := client.(core.RPCClient)
if !ok {
var expectedClientType core.RPCClient
return nil, nil, fmt.Errorf("ethereum payload constructor expected client type %T got %T", expectedClientType, client)
return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", expectedClientType, client)
}
streamChan := make(chan interface{}, eth.PayloadChanBufferSize)
return eth.NewPayloadStreamer(ethClient), streamChan, nil
case config.Bitcoin:
btcClientConn, ok := client.(*rpcclient.ConnConfig)
if !ok {
return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client config type %T got %T", rpcclient.ConnConfig{}, client)
}
streamChan := make(chan interface{}, btc.PayloadChanBufferSize)
return btc.NewPayloadStreamer(btcClientConn), streamChan, nil
default:
return nil, nil, fmt.Errorf("invalid chain %T for streamer constructor", chain)
}

View File

@ -31,14 +31,15 @@ type PayloadConverter struct {
chainConfig *params.ChainConfig
}
// NewPayloadConverter creates a pointer to a new Converter which satisfies the PayloadConverter interface
// NewPayloadConverter creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface
func NewPayloadConverter(chainConfig *params.ChainConfig) *PayloadConverter {
return &PayloadConverter{
chainConfig: chainConfig,
}
}
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload
// Convert method is used to convert a eth statediff.Payload to an IPLDPayload
// Satisfies the shared.PayloadConverter interface
func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
stateDiffPayload, ok := payload.(statediff.Payload)
if !ok {
@ -60,7 +61,7 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
TotalDifficulty: stateDiffPayload.TotalDifficulty,
Block: block,
HeaderRLP: headerRlp,
TrxMetaData: make([]TxModel, 0, trxLen),
TxMetaData: make([]TxModel, 0, trxLen),
Receipts: make(types.Receipts, 0, trxLen),
ReceiptMetaData: make([]ReceiptModel, 0, trxLen),
StateNodes: make([]TrieNode, 0),
@ -81,7 +82,7 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
Index: int64(i),
}
// txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody
convertedPayload.TrxMetaData = append(convertedPayload.TrxMetaData, txMeta)
convertedPayload.TxMetaData = append(convertedPayload.TxMetaData, txMeta)
}
// Decode receipts for this block

View File

@ -45,7 +45,7 @@ var _ = Describe("Converter", func() {
Expect(err).ToNot(HaveOccurred())
Expect(gotBody).To(Equal(expectedBody))
Expect(convertedPayload.HeaderRLP).To(Equal(mocks.MockHeaderRlp))
Expect(convertedPayload.TrxMetaData).To(Equal(mocks.MockTrxMeta))
Expect(convertedPayload.TxMetaData).To(Equal(mocks.MockTrxMeta))
Expect(convertedPayload.ReceiptMetaData).To(Equal(mocks.MockRctMeta))
})
})

View File

@ -102,7 +102,7 @@ func (s *ResponseFilterer) filterTransactions(trxFilter config.TxFilter, respons
trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions))
if !trxFilter.Off {
for i, trx := range payload.Block.Body().Transactions {
if checkTransactions(trxFilter.Src, trxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) {
if checkTransactions(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) {
trxBuffer := new(bytes.Buffer)
if err := trx.EncodeRLP(trxBuffer); err != nil {
return nil, err

View File

@ -238,7 +238,7 @@ var (
Block: MockBlock,
Receipts: MockReceipts,
HeaderRLP: MockHeaderRlp,
TrxMetaData: MockTrxMeta,
TxMetaData: MockTrxMeta,
ReceiptMetaData: MockRctMeta,
StorageNodes: MockStorageNodes,
StateNodes: MockStateNodes,

View File

@ -95,7 +95,7 @@ func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) {
}
// Process and publish transactions
transactionCids, err := pub.publishTransactions(ipldPayload.Block.Body(), ipldPayload.TrxMetaData)
transactionCids, err := pub.publishTransactions(ipldPayload.Block.Body(), ipldPayload.TxMetaData)
if err != nil {
return nil, err
}

View File

@ -17,8 +17,8 @@
package eth
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
)
@ -32,7 +32,7 @@ type PayloadStreamer struct {
Client core.RPCClient
}
// NewPayloadStreamer creates a pointer to a new StateDiffStreamer which satisfies the PayloadStreamer interface
// NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum
func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer {
return &PayloadStreamer{
Client: client,
@ -40,7 +40,8 @@ func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer {
}
// Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *PayloadStreamer) Stream(payloadChan chan interface{}) (*rpc.ClientSubscription, error) {
// Satisfies the shared.PayloadStreamer interface
func (ps *PayloadStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) {
logrus.Info("streaming diffs from geth")
return sds.Client.Subscribe("statediff", payloadChan, "stream")
return ps.Client.Subscribe("statediff", payloadChan, "stream")
}

View File

@ -18,7 +18,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/eth/fakes"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
)

View File

@ -32,7 +32,7 @@ type IPLDPayload struct {
TotalDifficulty *big.Int
Block *types.Block
HeaderRLP []byte
TrxMetaData []TxModel
TxMetaData []TxModel
Receipts types.Receipts
ReceiptMetaData []ReceiptModel
StateNodes []TrieNode

View File

@ -16,21 +16,37 @@
package shared
import (
"github.com/ethereum/go-ethereum/rpc"
)
// ResponseFilterer applies a filter to the streamed payload and returns a subscription response packet
type ResponseFilterer interface {
Filter(filter, payload interface{}) (response interface{}, err error)
// PayloadStreamer streams chain-specific payloads to the provided channel
type PayloadStreamer interface {
Stream(payloadChan chan interface{}) (ClientSubscription, error)
}
// CIDIndexer indexes a set of cids with their associated meta data in Postgres
// PayloadFetcher fetches chain-specific payloads
type PayloadFetcher interface {
FetchAt(blockHeights []uint64) ([]interface{}, error)
}
// PayloadConverter converts chain-specific payloads into IPLD payloads for publishing
type PayloadConverter interface {
Convert(payload interface{}) (interface{}, error)
}
// IPLDPublisher publishes IPLD payloads and returns a CID payload for indexing
type IPLDPublisher interface {
Publish(payload interface{}) (interface{}, error)
}
// CIDIndexer indexes a CID payload in Postgres
type CIDIndexer interface {
Index(cids interface{}) error
}
// CIDRetriever retrieves cids according to a provided filter and returns a cid
// ResponseFilterer applies a filter to an IPLD payload to return a subscription response packet
type ResponseFilterer interface {
Filter(filter, payload interface{}) (response interface{}, err error)
}
// CIDRetriever retrieves cids according to a provided filter and returns a CID wrapper
type CIDRetriever interface {
Retrieve(filter interface{}, blockNumber int64) (interface{}, bool, error)
RetrieveFirstBlockNumber() (int64, error)
@ -38,26 +54,18 @@ type CIDRetriever interface {
RetrieveGapsInData() ([]Gap, error)
}
type PayloadStreamer interface {
Stream(payloadChan chan interface{}) (*rpc.ClientSubscription, error)
}
type PayloadFetcher interface {
FetchAt(blockHeights []uint64) ([]interface{}, error)
}
// IPLDFetcher uses a CID wrapper to fetch an IPLD wrapper
type IPLDFetcher interface {
Fetch(cids interface{}) (interface{}, error)
}
type PayloadConverter interface {
Convert(payload interface{}) (interface{}, error)
}
type IPLDPublisher interface {
Publish(payload interface{}) (interface{}, error)
}
// IPLDResolver resolves an IPLD wrapper into chain-specific payloads
type IPLDResolver interface {
Resolve(iplds interface{}) (interface{}, error)
}
// ClientSubscription is a general interface for chain data subscriptions
type ClientSubscription interface {
Err() <-chan error
Unsubscribe()
}