emulate btc data streamer over http; misc fixes

This commit is contained in:
Ian Norden 2020-02-10 11:16:57 -06:00
parent 5173edf563
commit ef3b043f97
21 changed files with 201 additions and 94 deletions

View File

@ -104,6 +104,6 @@ func startServers(superNode super_node.SuperNode, settings *shared.SuperNodeConf
if err != nil {
return err
}
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{"eth"}, nil, nil, rpc.HTTPTimeouts{})
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{"eth", "btc"}, nil, nil, rpc.HTTPTimeouts{})
return err
}

View File

@ -1,11 +1,12 @@
-- +goose Up
CREATE TABLE btc.tx_inputs (
id SERIAL PRIMARY KEY,
tx_id INTEGER NOT NULL REFERENCES btc.transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
index INTEGER NOT NULL,
witness BYTEA[],
sig_script BYTEA NOT NULL,
outpoint_id INTEGER REFERENCES btc.tx_outputs (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
id SERIAL PRIMARY KEY,
tx_id INTEGER NOT NULL REFERENCES btc.transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
index INTEGER NOT NULL,
witness VARCHAR[],
sig_script BYTEA NOT NULL,
outpoint_tx_hash VARCHAR(66) NOT NULL,
outpoint_index NUMERIC NOT NULL,
UNIQUE (tx_id, index)
);

View File

@ -111,9 +111,10 @@ CREATE TABLE btc.tx_inputs (
id integer NOT NULL,
tx_id integer NOT NULL,
index integer NOT NULL,
witness bytea[],
witness character varying[],
sig_script bytea NOT NULL,
outpoint_id integer
outpoint_tx_hash character varying(66) NOT NULL,
outpoint_index numeric NOT NULL
);
@ -1288,14 +1289,6 @@ ALTER TABLE ONLY btc.transaction_cids
ADD CONSTRAINT transaction_cids_header_id_fkey FOREIGN KEY (header_id) REFERENCES btc.header_cids(id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED;
--
-- Name: tx_inputs tx_inputs_outpoint_id_fkey; Type: FK CONSTRAINT; Schema: btc; Owner: -
--
ALTER TABLE ONLY btc.tx_inputs
ADD CONSTRAINT tx_inputs_outpoint_id_fkey FOREIGN KEY (outpoint_id) REFERENCES btc.tx_outputs(id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED;
--
-- Name: tx_inputs tx_inputs_tx_id_fkey; Type: FK CONSTRAINT; Schema: btc; Owner: -
--

View File

@ -34,14 +34,16 @@ FROM alpine
WORKDIR /app
ARG USER
ARG config_file=environments/superNode.toml
ARG CONFIG_FILE
ARG EXPOSE_PORT_1
ARG EXPOSE_PORT_2
RUN adduser -D 5000 $USER
USER $USER
# chown first so dir is writable
# note: using $USER is merged, but not in the stable release yet
COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/$config_file config.toml
COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/$CONFIG_FILE config.toml
COPY --chown=5000:5000 --from=builder /go/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node/startup_script.sh .
# keep binaries immutable
@ -50,6 +52,7 @@ COPY --from=builder /go/src/github.com/pressly/goose/cmd/goose/goose goose
COPY --from=builder /go/src/github.com/vulcanize/vulcanizedb/db/migrations migrations/vulcanizedb
COPY --from=builder /go/src/github.com/ipfs/go-ipfs/ipfs ipfs
EXPOSE 8080
EXPOSE $EXPOSE_PORT_1
EXPOSE $EXPOSE_PORT_2
CMD ["./startup_script.sh"]

View File

@ -126,13 +126,14 @@ The config file contains the parameters needed to initialize a SuperNode with th
[superNode.server]
on = true
ipcPath = "/root/.vulcanize/vulcanize.ipc"
ipcPath = "/root/.vulcanize/eth/vulcanize.ipc"
wsPath = "127.0.0.1:8080"
httpPath = "127.0.0.1:8081"
[superNode.backFill]
on = false
httpPath = ""
frequency = 5
on = true
httpPath = "http://127.0.0.1:8545"
frequency = 15
batchSize = 50
```
@ -210,6 +211,6 @@ createdb vulcanize_public
8. Build and run the Docker image
```
cd $GOPATH/src/github.com/vulcanize/vulcanizedb/dockerfiles/super_node
docker build .
docker build --build-arg CONFIG_FILE=environments/ethSuperNode.toml --build-arg EXPOSE_PORT_1=8080 --build-arg EXPOSE_PORT_2=8081 .
docker run --network host -e IPFS_INIT=true -e VDB_PG_NAME=vulcanize_public -e VDB_PG_HOSTNAME=localhost -e VDB_PG_PORT=5432 -e VDB_PG_USER=postgres -e VDB_PG_PASSWORD=password {IMAGE_ID}
```

View File

@ -10,8 +10,10 @@
[superNode.sync]
on = true
wsPath = "http://127.0.0.1:8332"
wsPath = "127.0.0.1:8332"
workers = 1
pass = "password"
user = "username"
[superNode.server]
on = true
@ -21,6 +23,14 @@
[superNode.backFill]
on = true
httpPath = "http://127.0.0.1:8332"
frequency = 5
batchSize = 50
httpPath = "127.0.0.1:8332"
frequency = 15
batchSize = 50
pass = "password"
user = "username"
[superNode.btc]
nodeID = "ocd0"
clientName = "Omnicore"
genesisBlock = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
networkID = "0xD9B4BEF9"

View File

@ -22,5 +22,5 @@
[superNode.backFill]
on = true
httpPath = "http://127.0.0.1:8545"
frequency = 5
frequency = 15
batchSize = 50

View File

@ -61,10 +61,9 @@ var _ = Describe("Reading from the Geth blockchain", func() {
It("retrieves the node info", func(done Done) {
node := blockChain.Node()
mainnetID := float64(1)
Expect(node.GenesisBlock).ToNot(BeNil())
Expect(node.NetworkID).To(Equal(mainnetID))
Expect(node.NetworkID).To(Equal("1.000000"))
Expect(len(node.ID)).ToNot(BeZero())
Expect(node.ClientName).ToNot(BeZero())

View File

@ -114,9 +114,9 @@ func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup, quitChan <-chan bool) {
log.Error(err)
continue
}
if startingBlock != 1 {
if startingBlock != 0 {
log.Info("found gap at the beginning of the sync")
bfs.fillGaps(1, uint64(startingBlock-1))
bfs.fillGaps(0, uint64(startingBlock-1))
}
gaps, err := bfs.Retriever.RetrieveGapsInData()
if err != nil {

View File

@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil,
}
mockRetriever := &mocks2.MockCIDRetriever{
FirstBlockNumberToReturn: 1,
FirstBlockNumberToReturn: 0,
GapsToRetrieve: []shared.Gap{
{
Start: 100, Stop: 101,
@ -99,7 +99,7 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil,
}
mockRetriever := &mocks2.MockCIDRetriever{
FirstBlockNumberToReturn: 1,
FirstBlockNumberToReturn: 0,
GapsToRetrieve: []shared.Gap{
{
Start: 100, Stop: 100,
@ -183,7 +183,7 @@ var _ = Describe("BackFiller", func() {
Expect(mockConverter.PassedStatediffPayload[1]).To(Equal(mocks.MockStateDiffPayload))
Expect(mockRetriever.CalledTimes).To(Equal(1))
Expect(len(mockFetcher.CalledAtBlockHeights)).To(Equal(1))
Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{1, 2}))
Expect(mockFetcher.CalledAtBlockHeights[0]).To(Equal([]uint64{0, 1, 2}))
})
})
})

View File

@ -17,6 +17,7 @@
package btc
import (
"encoding/hex"
"fmt"
"github.com/btcsuite/btcd/chaincfg"
@ -62,7 +63,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame
SignatureScript: in.SignatureScript,
PreviousOutPointHash: in.PreviousOutPoint.Hash.String(),
PreviousOutPointIndex: in.PreviousOutPoint.Index,
TxWitness: in.Witness,
TxWitness: convertBytesToHexArray(in.Witness),
}
}
for i, out := range tx.MsgTx().TxOut {
@ -91,3 +92,11 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame
TxMetaData: txMeta,
}, nil
}
func convertBytesToHexArray(bytea [][]byte) []string {
var strs []string
for _, b := range bytea {
strs = append(strs, hex.EncodeToString(b))
}
return strs
}

View File

@ -0,0 +1,104 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc
import (
"bytes"
"time"
"github.com/btcsuite/btcd/rpcclient"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// HTTPPayloadStreamer satisfies the PayloadStreamer interface for bitcoin over http endpoints (since bitcoin core doesn't support websockets)
type HTTPPayloadStreamer struct {
Config *rpcclient.ConnConfig
lastHash []byte
}
// NewHTTPPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for bitcoin
func NewHTTPPayloadStreamer(clientConfig *rpcclient.ConnConfig) *HTTPPayloadStreamer {
return &HTTPPayloadStreamer{
Config: clientConfig,
}
}
// Stream is the main loop for subscribing to data from the btc block notifications
// Satisfies the shared.PayloadStreamer interface
func (ps *HTTPPayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
logrus.Info("streaming block payloads from btc")
client, err := rpcclient.New(ps.Config, nil)
if err != nil {
return nil, err
}
ticker := time.NewTicker(time.Second * 5)
errChan := make(chan error)
go func() {
for {
// start at
select {
case <-ticker.C:
height, err := client.GetBlockCount()
if err != nil {
errChan <- err
continue
}
blockHash, err := client.GetBlockHash(height)
if err != nil {
errChan <- err
continue
}
blockHashBytes := blockHash.CloneBytes()
if bytes.Equal(blockHashBytes, ps.lastHash) {
continue
}
ps.lastHash = blockHashBytes
block, err := client.GetBlock(blockHash)
if err != nil {
errChan <- err
continue
}
payloadChan <- BlockPayload{
Header: &block.Header,
Height: height,
Txs: msgTxsToUtilTxs(block.Transactions),
}
default:
}
}
}()
return &HTTPClientSubscription{client: client, errChan: errChan}, nil
}
// HTTPClientSubscription is a wrapper around the underlying bitcoind rpc client
// to fit the shared.ClientSubscription interface
type HTTPClientSubscription struct {
client *rpcclient.Client
errChan chan error
}
// Unsubscribe satisfies the rpc.Subscription interface
func (bcs *HTTPClientSubscription) Unsubscribe() {
bcs.client.Shutdown()
}
// Err() satisfies the rpc.Subscription interface
func (bcs *HTTPClientSubscription) Err() <-chan error {
return bcs.errChan
}

View File

@ -103,29 +103,10 @@ func (in *CIDIndexer) indexTransactionCID(tx *sqlx.Tx, transaction TxModelWithIn
}
func (in *CIDIndexer) indexTxInput(tx *sqlx.Tx, txInput TxInput, txID int64) error {
// resolve zero-value hash to null value (coinbase tx input with no referenced outputs)
if txInput.PreviousOutPointHash == "0000000000000000000000000000000000000000000000000000000000000000" {
_, err := tx.Exec(`INSERT INTO btc.tx_inputs (tx_id, index, witness, sig_script, outpoint_id)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tx_id, index) DO UPDATE SET (witness, sig_script, outpoint_id) = ($3, $4, $5)`,
txID, txInput.Index, pq.Array(txInput.TxWitness), txInput.SignatureScript, nil)
return err
}
var referencedOutPutID int64
if err := tx.Get(&referencedOutPutID, `SELECT tx_outputs.id FROM btc.transaction_cids, btc.tx_outputs
WHERE tx_outputs.tx_id = transaction_cids.id
AND tx_hash = $1
AND tx_outputs.index = $2`, txInput.PreviousOutPointHash, txInput.PreviousOutPointIndex); err != nil {
logrus.Errorf("btc indexer could not find the tx output (tx hash %s, output index %d) referenced in tx input %d of tx id %d", txInput.PreviousOutPointHash, txInput.PreviousOutPointIndex, txInput.Index, txID)
return err
}
if referencedOutPutID == 0 {
return fmt.Errorf("btc indexer could not find the tx output (tx hash %s, output index %d) referenced in tx input %d of tx id %d", txInput.PreviousOutPointHash, txInput.PreviousOutPointIndex, txInput.Index, txID)
}
_, err := tx.Exec(`INSERT INTO btc.tx_inputs (tx_id, index, witness, sig_script, outpoint_id)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tx_id, index) DO UPDATE SET (witness, sig_script, outpoint_id) = ($3, $4, $5)`,
txID, txInput.Index, pq.Array(txInput.TxWitness), txInput.SignatureScript, referencedOutPutID)
_, err := tx.Exec(`INSERT INTO btc.tx_inputs (tx_id, index, witness, sig_script, outpoint_tx_hash, outpoint_index)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (tx_id, index) DO UPDATE SET (witness, sig_script, outpoint_tx_hash, outpoint_index) = ($3, $4, $5, $6)`,
txID, txInput.Index, pq.Array(txInput.TxWitness), txInput.SignatureScript, txInput.PreviousOutPointHash, txInput.PreviousOutPointIndex)
return err
}

View File

@ -43,8 +43,6 @@ var _ = Describe("Indexer", func() {
Describe("Index", func() {
It("Indexes CIDs and related metadata into vulcanizedb", func() {
err = repo.Index(&mocks.DummyCIDPayloadForFKReference)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&mocks.MockCIDPayload)
Expect(err).ToNot(HaveOccurred())
pgStr := `SELECT * FROM btc.header_cids

View File

@ -29,7 +29,7 @@ import (
)
var (
MockBlockHeight int32 = 1337
MockBlockHeight int64 = 1337
MockBlock = wire.MsgBlock{
Header: wire.BlockHeader{
Version: 1,

View File

@ -59,11 +59,10 @@ type TxInput struct {
ID int64 `db:"id"`
TxID int64 `db:"tx_id"`
Index int64 `db:"index"`
TxWitness [][]byte `db:"witness"`
TxWitness []string `db:"witness"`
SignatureScript []byte `db:"sig_script"`
PreviousOutPointID int64 `db:"outpoint_id"`
PreviousOutPointIndex uint32
PreviousOutPointHash string
PreviousOutPointIndex uint32 `db:"outpoint_tx_hash"`
PreviousOutPointHash string `db:"outpoint_index"`
}
// TxOutput is the db model for btc.tx_outputs table

View File

@ -55,7 +55,7 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChain
return nil, err
}
blockPayloads[i] = BlockPayload{
Height: int32(height),
Height: int64(height),
Header: &block.Header,
Txs: msgTxsToUtilTxs(block.Transactions),
}

View File

@ -49,7 +49,7 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.
// Notification handler for block connections, forwards new block data to the payloadChan
OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txs []*btcutil.Tx) {
payloadChan <- BlockPayload{
Height: height,
Height: int64(height),
Header: header,
Txs: txs,
}

View File

@ -27,7 +27,7 @@ import (
// BlockPayload packages the block and tx data received from block connection notifications
type BlockPayload struct {
Height int32
Height int64
Header *wire.BlockHeader
Txs []*btcutil.Tx
}

View File

@ -85,7 +85,7 @@ func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (sha
return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client config type %T got %T", rpcclient.ConnConfig{}, clientOrConfig)
}
streamChan := make(chan shared.RawChainData, btc.PayloadChanBufferSize)
return btc.NewPayloadStreamer(btcClientConn), streamChan, nil
return btc.NewHTTPPayloadStreamer(btcClientConn), streamChan, nil
default:
return nil, nil, fmt.Errorf("invalid chain %s for streamer constructor", chain.String())
}

View File

@ -17,13 +17,14 @@
package shared
import (
"os"
"path/filepath"
"time"
"github.com/btcsuite/btcd/rpcclient"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/spf13/viper"
"os"
"path/filepath"
"time"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/eth"
@ -92,19 +93,23 @@ func NewSuperNodeConfig() (*SuperNodeConfig, error) {
workers = 1
}
sn.Workers = workers
if sn.Chain == Ethereum {
sn.NodeInfo, sn.WSClient, err = getEthNodeAndClient(sn.Chain, viper.GetString("superNode.sync.wsPath"))
}
if sn.Chain == Bitcoin {
switch sn.Chain {
case Ethereum:
sn.NodeInfo, sn.WSClient, err = getEthNodeAndClient(viper.GetString("superNode.sync.wsPath"))
case Bitcoin:
sn.NodeInfo = core.Node{
ID: "temporaryID",
ClientName: "omnicored",
GenesisBlock: "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f",
NetworkID: "0xD9B4BEF9",
ID: viper.GetString("superNode.btc.nodeID"),
ClientName: viper.GetString("superNode.btc.clientName"),
GenesisBlock: viper.GetString("superNode.btc.genesisBlock"),
NetworkID: viper.GetString("superNode.btc.networkID"),
}
// For bitcoin we load in node info from the config because there is no RPC endpoint to retrieve this from the node
sn.WSClient = &rpcclient.ConnConfig{
Host: viper.GetString("superNode.sync.wsPath"),
Endpoint: "ws",
Host: viper.GetString("superNode.sync.wsPath"),
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default
Pass: viper.GetString("superNode.sync.pass"),
User: viper.GetString("superNode.sync.user"),
}
}
}
@ -125,7 +130,7 @@ func NewSuperNodeConfig() (*SuperNodeConfig, error) {
sn.IPCEndpoint = ipcPath
httpPath := viper.GetString("superNode.server.httpPath")
if httpPath == "" {
httpPath = "http://127.0.0.1:8547"
httpPath = "http://127.0.0.1:8545"
}
sn.HTTPEndpoint = httpPath
}
@ -145,31 +150,35 @@ func (sn *SuperNodeConfig) BackFillFields() error {
sn.BackFill = true
var httpClient interface{}
var err error
if sn.Chain == Ethereum {
_, httpClient, err = getEthNodeAndClient(sn.Chain, viper.GetString("superNode.backFill.httpPath"))
switch sn.Chain {
case Ethereum:
_, httpClient, err = getEthNodeAndClient(viper.GetString("superNode.backFill.httpPath"))
if err != nil {
return err
}
}
if sn.Chain == Bitcoin {
case Bitcoin:
httpClient = &rpcclient.ConnConfig{
Host: viper.GetString("superNode.backFill.httpPath"),
Host: viper.GetString("superNode.backFill.httpPath"),
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default
Pass: viper.GetString("superNode.backFill.pass"),
User: viper.GetString("superNode.backFill.user"),
}
}
sn.HTTPClient = httpClient
freq := viper.GetInt("superNode.backFill.frequency")
var frequency time.Duration
if freq <= 0 {
frequency = time.Minute * 5
frequency = time.Second * 30
} else {
frequency = time.Duration(freq)
frequency = time.Second * time.Duration(freq)
}
sn.Frequency = frequency
sn.BatchSize = uint64(viper.GetInt64("superNode.backFill.batchSize"))
return nil
}
func getEthNodeAndClient(chain ChainType, path string) (core.Node, interface{}, error) {
func getEthNodeAndClient(path string) (core.Node, interface{}, error) {
rawRPCClient, err := rpc.Dial(path)
if err != nil {
return core.Node{}, nil, err