diff --git a/db/migrations/00043_create_btc_tx_inputs_table.sql b/db/migrations/00043_create_btc_tx_inputs_table.sql index 9e7b0600..5a70f4c3 100644 --- a/db/migrations/00043_create_btc_tx_inputs_table.sql +++ b/db/migrations/00043_create_btc_tx_inputs_table.sql @@ -4,8 +4,7 @@ CREATE TABLE btc.tx_inputs ( tx_id INTEGER NOT NULL REFERENCES btc.transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, index INTEGER NOT NULL, tx_witness BYTEA[], - sequence INTEGER NOT NULL, - script BYTEA NOT NULL, + sig_script BYTEA NOT NULL, outpoint_hash VARCHAR(66) NOT NULL, outpoint_index INTEGER NOT NULL, UNIQUE (tx_id, index) diff --git a/db/migrations/00044_create_btc_tx_outputs_table.sql b/db/migrations/00044_create_btc_tx_outputs_table.sql index db40a3d9..836455a3 100644 --- a/db/migrations/00044_create_btc_tx_outputs_table.sql +++ b/db/migrations/00044_create_btc_tx_outputs_table.sql @@ -4,7 +4,7 @@ CREATE TABLE btc.tx_outputs ( tx_id INTEGER NOT NULL REFERENCES btc.transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, index INTEGER NOT NULL, value INTEGER NOT NULL, - script BYTEA NOT NULL, + pk_script BYTEA NOT NULL, UNIQUE (tx_id, index) ); diff --git a/pkg/super_node/btc/converter.go b/pkg/super_node/btc/converter.go index 79d7f762..600d9bb0 100644 --- a/pkg/super_node/btc/converter.go +++ b/pkg/super_node/btc/converter.go @@ -37,17 +37,35 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Streame if !ok { return nil, fmt.Errorf("btc converter: expected payload type %T got %T", BlockPayload{}, payload) } - txMeta := make([]TxModel, len(btcBlockPayload.Txs)) + txMeta := make([]TxModelWithInsAndOuts, len(btcBlockPayload.Txs)) for _, tx := range btcBlockPayload.Txs { index := tx.Index() - txModel := TxModel{ + txModel := TxModelWithInsAndOuts{ TxHash: tx.Hash().String(), Index: int64(tx.Index()), HasWitness: tx.HasWitness(), + TxOutputs: make([]TxOutput, len(tx.MsgTx().TxOut)), + TxInputs: make([]TxInput, len(tx.MsgTx().TxIn)), } if tx.HasWitness() { txModel.WitnessHash = tx.WitnessHash().String() } + for i, in := range tx.MsgTx().TxIn { + txModel.TxInputs[i] = TxInput{ + Index: int64(i), + SignatureScript: in.SignatureScript, + PreviousOutPointHash: in.PreviousOutPoint.Hash.String(), + PreviousOutPointIndex: in.PreviousOutPoint.Index, + TxWitness: in.Witness, + } + } + for i, out := range tx.MsgTx().TxOut { + txModel.TxOutputs[i] = TxOutput{ + Index: int64(i), + Value: out.Value, + PkScript: out.PkScript, + } + } txMeta[index] = txModel } return IPLDPayload{ diff --git a/pkg/super_node/btc/indexer.go b/pkg/super_node/btc/indexer.go index 8dd3c1ae..5e705a8b 100644 --- a/pkg/super_node/btc/indexer.go +++ b/pkg/super_node/btc/indexer.go @@ -15,3 +15,98 @@ // along with this program. If not, see . package btc + +import ( + "fmt" + + "github.com/jmoiron/sqlx" + "github.com/lib/pq" + + "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +type CIDIndexer struct { + db *postgres.DB +} + +func NewCIDIndexer(db *postgres.DB) *CIDIndexer { + return &CIDIndexer{ + db: db, + } +} + +func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { + cidWrapper, ok := cids.(*CIDPayload) + if !ok { + return fmt.Errorf("btc indexer expected cids type %T got %T", &CIDPayload{}, cids) + } + tx, err := in.db.Beginx() + if err != nil { + return err + } + headerID, err := in.indexHeaderCID(tx, cidWrapper.HeaderCID) + if err != nil { + return err + } + if err := in.indexTransactionCIDs(tx, cidWrapper.TransactionCIDs, headerID); err != nil { + return err + } + return tx.Commit() +} + +func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) { + var headerID int64 + err := tx.QueryRowx(`INSERT INTO btc.header_cids (block_number, block_hash, parent_hash, cid, version, timestamp, bits) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, version, timestamp, bits) = ($3, $4, $5, $6, $7) + RETURNING id`, + header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.Version, header.Timestamp, header.Bits).Scan(&headerID) + return headerID, err +} + +func (in *CIDIndexer) indexTransactionCIDs(tx *sqlx.Tx, transactions []TxModelWithInsAndOuts, headerID int64) error { + for _, transaction := range transactions { + txID, err := in.indexTransactionCID(tx, transaction, headerID) + if err != nil { + return err + } + for _, input := range transaction.TxInputs { + if err := in.indexTxInput(tx, input, txID); err != nil { + return err + } + } + for _, output := range transaction.TxOutputs { + if err := in.indexTxOutput(tx, output, txID); err != nil { + return err + } + } + } + return nil +} + +func (in *CIDIndexer) indexTransactionCID(tx *sqlx.Tx, transaction TxModelWithInsAndOuts, headerID int64) (int64, error) { + var txID int64 + err := tx.QueryRowx(`INSERT INTO btc.transaction_cids (header_id, tx_hash, index, cid, has_witness, witness_hash) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (header_id, tx_hash) DO UPDATE SET (index, cid, has_witness, witness_hash) = ($3, $4, $5, $6) + RETURNING id`, + headerID, transaction.TxHash, transaction.Index, transaction.CID, transaction.HasWitness, transaction.WitnessHash).Scan(&txID) + return txID, err +} + +func (in *CIDIndexer) indexTxInput(tx *sqlx.Tx, txInput TxInput, txID int64) error { + _, err := tx.Exec(`INSERT INTO btc.tx_inputs (tx_id, index, tx_witness, sig_script, outpoint_hash, outpoint_index) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (tx_id, index) DO UPDATE SET (tx_witness, sig_script, outpoint_hash, outpoint_index) = ($3, $4, $5, $6)`, + txID, txInput.Index, pq.Array(txInput.TxWitness), txInput.SignatureScript, txInput.PreviousOutPointHash, txInput.PreviousOutPointIndex) + return err +} + +func (in *CIDIndexer) indexTxOutput(tx *sqlx.Tx, txOuput TxOutput, txID int64) error { + _, err := tx.Exec(`INSERT INTO btc.tx_outputs (tx_id, index, value, pk_script) + VALUES ($1, $2, $3, $4) + ON CONFLICT (ix_id, index) DO UPDATE SET (value, pk_script) = ($3, $4)`, + txID, txOuput.Index, txOuput.Value, txOuput.PkScript) + return err +} diff --git a/pkg/super_node/btc/models.go b/pkg/super_node/btc/models.go index ff41908f..13cbad1f 100644 --- a/pkg/super_node/btc/models.go +++ b/pkg/super_node/btc/models.go @@ -23,9 +23,9 @@ type HeaderModel struct { BlockHash string `db:"block_hash"` ParentHash string `db:"parent_hash"` CID string `db:"cid"` - Version int64 `db:"version"` + Version int32 `db:"version"` Timestamp int64 `db:"timestamp"` - Bits int64 `db:"bits"` + Bits uint32 `db:"bits"` } // TxModel is the db model for btc.transaction_cids table @@ -39,16 +39,28 @@ type TxModel struct { WitnessHash string `db:"witness_hash"` } +// TxModelWithInsAndOuts is the db model for btc.transaction_cids table that includes the children tx_input and tx_output tables +type TxModelWithInsAndOuts struct { + ID int64 `db:"id"` + HeaderID int64 `db:"header_id"` + Index int64 `db:"index"` + TxHash string `db:"tx_hash"` + CID string `db:"cid"` + HasWitness bool `db:"has_witness"` + WitnessHash string `db:"witness_hash"` + TxInputs []TxInput + TxOutputs []TxOutput +} + // TxInput is the db model for btc.tx_inputs table type TxInput struct { ID int64 `db:"id"` TxID int64 `db:"tx_id"` Index int64 `db:"index"` TxWitness [][]byte `db:"tx_witness"` - Sequence int64 `db:"sequence"` - SignatureScript []byte `db:"script"` - PreviousOutPointHash []byte `db:"outpoint_hash"` - PreviousOutPointIndex int64 `db:"outpoint_index"` + SignatureScript []byte `db:"sig_script"` + PreviousOutPointHash string `db:"outpoint_hash"` + PreviousOutPointIndex uint32 `db:"outpoint_index"` } // TxOutput is the db model for btc.tx_outputs table @@ -57,5 +69,5 @@ type TxOutput struct { TxID int64 `db:"tx_id"` Index int64 `db:"index"` Value int64 `db:"value"` - PkScript []byte `db:"script"` + PkScript []byte `db:"pk_script"` } diff --git a/pkg/super_node/btc/publisher.go b/pkg/super_node/btc/publisher.go index 72d360f2..fa5b6185 100644 --- a/pkg/super_node/btc/publisher.go +++ b/pkg/super_node/btc/publisher.go @@ -63,6 +63,9 @@ func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForI ParentHash: ipldPayload.Header.PrevBlock.String(), BlockNumber: strconv.Itoa(int(ipldPayload.Height)), BlockHash: ipldPayload.Header.BlockHash().String(), + Version: ipldPayload.Header.Version, + Timestamp: ipldPayload.Header.Timestamp.UnixNano(), + Bits: ipldPayload.Header.Bits, } // Process and publish transactions transactionCids, err := pub.publishTransactions(ipldPayload.Txs, ipldPayload.TxMetaData) @@ -84,7 +87,7 @@ func (pub *IPLDPublisher) publishHeader(header *wire.BlockHeader) (string, error return cids[0], nil } -func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMeta []TxModel) ([]TxModel, error) { +func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMeta []TxModelWithInsAndOuts) ([]TxModelWithInsAndOuts, error) { transactionCids, err := pub.TransactionPutter.DagPut(transactions) if err != nil { return nil, err @@ -92,14 +95,16 @@ func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMet if len(transactionCids) != len(trxMeta) { return nil, errors.New("expected one CID for each transaction") } - mappedTrxCids := make([]TxModel, len(transactionCids)) + mappedTrxCids := make([]TxModelWithInsAndOuts, len(transactionCids)) for i, cid := range transactionCids { - mappedTrxCids[i] = TxModel{ + mappedTrxCids[i] = TxModelWithInsAndOuts{ CID: cid, Index: trxMeta[i].Index, TxHash: trxMeta[i].TxHash, HasWitness: trxMeta[i].HasWitness, WitnessHash: trxMeta[i].WitnessHash, + TxInputs: trxMeta[i].TxInputs, + TxOutputs: trxMeta[i].TxOutputs, } } return mappedTrxCids, nil diff --git a/pkg/super_node/btc/types.go b/pkg/super_node/btc/types.go index dbec9fbc..bcd96059 100644 --- a/pkg/super_node/btc/types.go +++ b/pkg/super_node/btc/types.go @@ -39,7 +39,7 @@ type BlockPayload struct { // Passed to IPLDPublisher and ResponseFilterer type IPLDPayload struct { BlockPayload - TxMetaData []TxModel + TxMetaData []TxModelWithInsAndOuts } func (ip IPLDPayload) Value() shared.StreamedIPLDs { @@ -51,7 +51,7 @@ func (ip IPLDPayload) Value() shared.StreamedIPLDs { // Passed to CIDIndexer type CIDPayload struct { HeaderCID HeaderModel - TransactionCIDs []TxModel + TransactionCIDs []TxModelWithInsAndOuts } // CIDWrapper is used to direct fetching of IPLDs from IPFS diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index 7eec8f27..219c839d 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -48,6 +48,8 @@ func NewCIDIndexer(chain config.ChainType, db *postgres.DB) (shared.CIDIndexer, switch chain { case config.Ethereum: return eth.NewCIDIndexer(db), nil + case config.Bitcoin: + return btc.NewCIDIndexer(db), nil default: return nil, fmt.Errorf("invalid chain %T for indexer constructor", chain) } @@ -128,6 +130,8 @@ func NewIPLDPublisher(chain config.ChainType, ipfsPath string) (shared.IPLDPubli switch chain { case config.Ethereum: return eth.NewIPLDPublisher(ipfsPath) + case config.Bitcoin: + return btc.NewIPLDPublisher(ipfsPath) default: return nil, fmt.Errorf("invalid chain %T for publisher constructor", chain) } diff --git a/pkg/super_node/eth/indexer.go b/pkg/super_node/eth/indexer.go index 41c78ec3..9ea10c84 100644 --- a/pkg/super_node/eth/indexer.go +++ b/pkg/super_node/eth/indexer.go @@ -81,7 +81,7 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { return tx.Commit() } -func (repo *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) { +func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) { var headerID int64 err := tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td) = ($3, $4, $5)