btc cid indexer
This commit is contained in:
parent
5094b975fc
commit
808f1b5662
@ -4,8 +4,7 @@ CREATE TABLE btc.tx_inputs (
|
||||
tx_id INTEGER NOT NULL REFERENCES btc.transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
|
||||
index INTEGER NOT NULL,
|
||||
tx_witness BYTEA[],
|
||||
sequence INTEGER NOT NULL,
|
||||
script BYTEA NOT NULL,
|
||||
sig_script BYTEA NOT NULL,
|
||||
outpoint_hash VARCHAR(66) NOT NULL,
|
||||
outpoint_index INTEGER NOT NULL,
|
||||
UNIQUE (tx_id, index)
|
||||
|
@ -4,7 +4,7 @@ CREATE TABLE btc.tx_outputs (
|
||||
tx_id INTEGER NOT NULL REFERENCES btc.transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
|
||||
index INTEGER NOT NULL,
|
||||
value INTEGER NOT NULL,
|
||||
script BYTEA NOT NULL,
|
||||
pk_script BYTEA NOT NULL,
|
||||
UNIQUE (tx_id, index)
|
||||
);
|
||||
|
||||
|
@ -37,17 +37,35 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("btc converter: expected payload type %T got %T", BlockPayload{}, payload)
|
||||
}
|
||||
txMeta := make([]TxModel, len(btcBlockPayload.Txs))
|
||||
txMeta := make([]TxModelWithInsAndOuts, len(btcBlockPayload.Txs))
|
||||
for _, tx := range btcBlockPayload.Txs {
|
||||
index := tx.Index()
|
||||
txModel := TxModel{
|
||||
txModel := TxModelWithInsAndOuts{
|
||||
TxHash: tx.Hash().String(),
|
||||
Index: int64(tx.Index()),
|
||||
HasWitness: tx.HasWitness(),
|
||||
TxOutputs: make([]TxOutput, len(tx.MsgTx().TxOut)),
|
||||
TxInputs: make([]TxInput, len(tx.MsgTx().TxIn)),
|
||||
}
|
||||
if tx.HasWitness() {
|
||||
txModel.WitnessHash = tx.WitnessHash().String()
|
||||
}
|
||||
for i, in := range tx.MsgTx().TxIn {
|
||||
txModel.TxInputs[i] = TxInput{
|
||||
Index: int64(i),
|
||||
SignatureScript: in.SignatureScript,
|
||||
PreviousOutPointHash: in.PreviousOutPoint.Hash.String(),
|
||||
PreviousOutPointIndex: in.PreviousOutPoint.Index,
|
||||
TxWitness: in.Witness,
|
||||
}
|
||||
}
|
||||
for i, out := range tx.MsgTx().TxOut {
|
||||
txModel.TxOutputs[i] = TxOutput{
|
||||
Index: int64(i),
|
||||
Value: out.Value,
|
||||
PkScript: out.PkScript,
|
||||
}
|
||||
}
|
||||
txMeta[index] = txModel
|
||||
}
|
||||
return IPLDPayload{
|
||||
|
@ -15,3 +15,98 @@
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package btc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/lib/pq"
|
||||
|
||||
"github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres"
|
||||
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
|
||||
)
|
||||
|
||||
type CIDIndexer struct {
|
||||
db *postgres.DB
|
||||
}
|
||||
|
||||
func NewCIDIndexer(db *postgres.DB) *CIDIndexer {
|
||||
return &CIDIndexer{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
|
||||
cidWrapper, ok := cids.(*CIDPayload)
|
||||
if !ok {
|
||||
return fmt.Errorf("btc indexer expected cids type %T got %T", &CIDPayload{}, cids)
|
||||
}
|
||||
tx, err := in.db.Beginx()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
headerID, err := in.indexHeaderCID(tx, cidWrapper.HeaderCID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := in.indexTransactionCIDs(tx, cidWrapper.TransactionCIDs, headerID); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) {
|
||||
var headerID int64
|
||||
err := tx.QueryRowx(`INSERT INTO btc.header_cids (block_number, block_hash, parent_hash, cid, version, timestamp, bits)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, version, timestamp, bits) = ($3, $4, $5, $6, $7)
|
||||
RETURNING id`,
|
||||
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.Version, header.Timestamp, header.Bits).Scan(&headerID)
|
||||
return headerID, err
|
||||
}
|
||||
|
||||
func (in *CIDIndexer) indexTransactionCIDs(tx *sqlx.Tx, transactions []TxModelWithInsAndOuts, headerID int64) error {
|
||||
for _, transaction := range transactions {
|
||||
txID, err := in.indexTransactionCID(tx, transaction, headerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, input := range transaction.TxInputs {
|
||||
if err := in.indexTxInput(tx, input, txID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, output := range transaction.TxOutputs {
|
||||
if err := in.indexTxOutput(tx, output, txID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (in *CIDIndexer) indexTransactionCID(tx *sqlx.Tx, transaction TxModelWithInsAndOuts, headerID int64) (int64, error) {
|
||||
var txID int64
|
||||
err := tx.QueryRowx(`INSERT INTO btc.transaction_cids (header_id, tx_hash, index, cid, has_witness, witness_hash)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (index, cid, has_witness, witness_hash) = ($3, $4, $5, $6)
|
||||
RETURNING id`,
|
||||
headerID, transaction.TxHash, transaction.Index, transaction.CID, transaction.HasWitness, transaction.WitnessHash).Scan(&txID)
|
||||
return txID, err
|
||||
}
|
||||
|
||||
func (in *CIDIndexer) indexTxInput(tx *sqlx.Tx, txInput TxInput, txID int64) error {
|
||||
_, err := tx.Exec(`INSERT INTO btc.tx_inputs (tx_id, index, tx_witness, sig_script, outpoint_hash, outpoint_index)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
ON CONFLICT (tx_id, index) DO UPDATE SET (tx_witness, sig_script, outpoint_hash, outpoint_index) = ($3, $4, $5, $6)`,
|
||||
txID, txInput.Index, pq.Array(txInput.TxWitness), txInput.SignatureScript, txInput.PreviousOutPointHash, txInput.PreviousOutPointIndex)
|
||||
return err
|
||||
}
|
||||
|
||||
func (in *CIDIndexer) indexTxOutput(tx *sqlx.Tx, txOuput TxOutput, txID int64) error {
|
||||
_, err := tx.Exec(`INSERT INTO btc.tx_outputs (tx_id, index, value, pk_script)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (ix_id, index) DO UPDATE SET (value, pk_script) = ($3, $4)`,
|
||||
txID, txOuput.Index, txOuput.Value, txOuput.PkScript)
|
||||
return err
|
||||
}
|
||||
|
@ -23,9 +23,9 @@ type HeaderModel struct {
|
||||
BlockHash string `db:"block_hash"`
|
||||
ParentHash string `db:"parent_hash"`
|
||||
CID string `db:"cid"`
|
||||
Version int64 `db:"version"`
|
||||
Version int32 `db:"version"`
|
||||
Timestamp int64 `db:"timestamp"`
|
||||
Bits int64 `db:"bits"`
|
||||
Bits uint32 `db:"bits"`
|
||||
}
|
||||
|
||||
// TxModel is the db model for btc.transaction_cids table
|
||||
@ -39,16 +39,28 @@ type TxModel struct {
|
||||
WitnessHash string `db:"witness_hash"`
|
||||
}
|
||||
|
||||
// TxModelWithInsAndOuts is the db model for btc.transaction_cids table that includes the children tx_input and tx_output tables
|
||||
type TxModelWithInsAndOuts struct {
|
||||
ID int64 `db:"id"`
|
||||
HeaderID int64 `db:"header_id"`
|
||||
Index int64 `db:"index"`
|
||||
TxHash string `db:"tx_hash"`
|
||||
CID string `db:"cid"`
|
||||
HasWitness bool `db:"has_witness"`
|
||||
WitnessHash string `db:"witness_hash"`
|
||||
TxInputs []TxInput
|
||||
TxOutputs []TxOutput
|
||||
}
|
||||
|
||||
// TxInput is the db model for btc.tx_inputs table
|
||||
type TxInput struct {
|
||||
ID int64 `db:"id"`
|
||||
TxID int64 `db:"tx_id"`
|
||||
Index int64 `db:"index"`
|
||||
TxWitness [][]byte `db:"tx_witness"`
|
||||
Sequence int64 `db:"sequence"`
|
||||
SignatureScript []byte `db:"script"`
|
||||
PreviousOutPointHash []byte `db:"outpoint_hash"`
|
||||
PreviousOutPointIndex int64 `db:"outpoint_index"`
|
||||
SignatureScript []byte `db:"sig_script"`
|
||||
PreviousOutPointHash string `db:"outpoint_hash"`
|
||||
PreviousOutPointIndex uint32 `db:"outpoint_index"`
|
||||
}
|
||||
|
||||
// TxOutput is the db model for btc.tx_outputs table
|
||||
@ -57,5 +69,5 @@ type TxOutput struct {
|
||||
TxID int64 `db:"tx_id"`
|
||||
Index int64 `db:"index"`
|
||||
Value int64 `db:"value"`
|
||||
PkScript []byte `db:"script"`
|
||||
PkScript []byte `db:"pk_script"`
|
||||
}
|
||||
|
@ -63,6 +63,9 @@ func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForI
|
||||
ParentHash: ipldPayload.Header.PrevBlock.String(),
|
||||
BlockNumber: strconv.Itoa(int(ipldPayload.Height)),
|
||||
BlockHash: ipldPayload.Header.BlockHash().String(),
|
||||
Version: ipldPayload.Header.Version,
|
||||
Timestamp: ipldPayload.Header.Timestamp.UnixNano(),
|
||||
Bits: ipldPayload.Header.Bits,
|
||||
}
|
||||
// Process and publish transactions
|
||||
transactionCids, err := pub.publishTransactions(ipldPayload.Txs, ipldPayload.TxMetaData)
|
||||
@ -84,7 +87,7 @@ func (pub *IPLDPublisher) publishHeader(header *wire.BlockHeader) (string, error
|
||||
return cids[0], nil
|
||||
}
|
||||
|
||||
func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMeta []TxModel) ([]TxModel, error) {
|
||||
func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMeta []TxModelWithInsAndOuts) ([]TxModelWithInsAndOuts, error) {
|
||||
transactionCids, err := pub.TransactionPutter.DagPut(transactions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -92,14 +95,16 @@ func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMet
|
||||
if len(transactionCids) != len(trxMeta) {
|
||||
return nil, errors.New("expected one CID for each transaction")
|
||||
}
|
||||
mappedTrxCids := make([]TxModel, len(transactionCids))
|
||||
mappedTrxCids := make([]TxModelWithInsAndOuts, len(transactionCids))
|
||||
for i, cid := range transactionCids {
|
||||
mappedTrxCids[i] = TxModel{
|
||||
mappedTrxCids[i] = TxModelWithInsAndOuts{
|
||||
CID: cid,
|
||||
Index: trxMeta[i].Index,
|
||||
TxHash: trxMeta[i].TxHash,
|
||||
HasWitness: trxMeta[i].HasWitness,
|
||||
WitnessHash: trxMeta[i].WitnessHash,
|
||||
TxInputs: trxMeta[i].TxInputs,
|
||||
TxOutputs: trxMeta[i].TxOutputs,
|
||||
}
|
||||
}
|
||||
return mappedTrxCids, nil
|
||||
|
@ -39,7 +39,7 @@ type BlockPayload struct {
|
||||
// Passed to IPLDPublisher and ResponseFilterer
|
||||
type IPLDPayload struct {
|
||||
BlockPayload
|
||||
TxMetaData []TxModel
|
||||
TxMetaData []TxModelWithInsAndOuts
|
||||
}
|
||||
|
||||
func (ip IPLDPayload) Value() shared.StreamedIPLDs {
|
||||
@ -51,7 +51,7 @@ func (ip IPLDPayload) Value() shared.StreamedIPLDs {
|
||||
// Passed to CIDIndexer
|
||||
type CIDPayload struct {
|
||||
HeaderCID HeaderModel
|
||||
TransactionCIDs []TxModel
|
||||
TransactionCIDs []TxModelWithInsAndOuts
|
||||
}
|
||||
|
||||
// CIDWrapper is used to direct fetching of IPLDs from IPFS
|
||||
|
@ -48,6 +48,8 @@ func NewCIDIndexer(chain config.ChainType, db *postgres.DB) (shared.CIDIndexer,
|
||||
switch chain {
|
||||
case config.Ethereum:
|
||||
return eth.NewCIDIndexer(db), nil
|
||||
case config.Bitcoin:
|
||||
return btc.NewCIDIndexer(db), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid chain %T for indexer constructor", chain)
|
||||
}
|
||||
@ -128,6 +130,8 @@ func NewIPLDPublisher(chain config.ChainType, ipfsPath string) (shared.IPLDPubli
|
||||
switch chain {
|
||||
case config.Ethereum:
|
||||
return eth.NewIPLDPublisher(ipfsPath)
|
||||
case config.Bitcoin:
|
||||
return btc.NewIPLDPublisher(ipfsPath)
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid chain %T for publisher constructor", chain)
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (repo *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) {
|
||||
func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) {
|
||||
var headerID int64
|
||||
err := tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td) VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td) = ($3, $4, $5)
|
||||
|
Loading…
Reference in New Issue
Block a user