From 8643a7f3b66adc6a15dbb3a934501357677e2307 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 7 Feb 2020 14:00:01 -0600 Subject: [PATCH] btc ipld fetcher, paylaod fetcher, and ipld resolver --- pkg/super_node/btc/ipld_fetcher.go | 113 ++++++++++++++++++++++++++ pkg/super_node/btc/payload_fetcher.go | 70 ++++++++++++++++ pkg/super_node/btc/resolver.go | 37 +++++++++ 3 files changed, 220 insertions(+) create mode 100644 pkg/super_node/btc/payload_fetcher.go diff --git a/pkg/super_node/btc/ipld_fetcher.go b/pkg/super_node/btc/ipld_fetcher.go index 8dd3c1ae..633cbf75 100644 --- a/pkg/super_node/btc/ipld_fetcher.go +++ b/pkg/super_node/btc/ipld_fetcher.go @@ -15,3 +15,116 @@ // along with this program. If not, see . package btc + +import ( + "context" + "errors" + "fmt" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + + "github.com/ipfs/go-block-format" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + log "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/pkg/ipfs" +) + +var ( + errUnexpectedNumberOfIPLDs = errors.New("ipfs batch fetch returned unexpected number of IPLDs") +) + +// IPLDFetcher satisfies the IPLDFetcher interface for ethereum +type IPLDFetcher struct { + BlockService blockservice.BlockService +} + +// NewIPLDFetcher creates a pointer to a new IPLDFetcher +func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) { + blockService, err := ipfs.InitIPFSBlockService(ipfsPath) + if err != nil { + return nil, err + } + return &IPLDFetcher{ + BlockService: blockService, + }, nil +} + +// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper +func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, error) { + cidWrapper, ok := cids.(*CIDWrapper) + if !ok { + return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) + } + log.Debug("fetching iplds") + iplds := new(IPLDWrapper) + iplds.BlockNumber = cidWrapper.BlockNumber + var err error + iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers) + if err != nil { + return nil, err + } + iplds.Transactions, err = f.FetchTrxs(cidWrapper.Transactions) + if err != nil { + return nil, err + } + return iplds, nil +} + +// FetchHeaders fetches headers +// It uses the f.fetchBatch method +func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]blocks.Block, error) { + log.Debug("fetching header iplds") + headerCids := make([]cid.Cid, 0, len(cids)) + for _, c := range cids { + dc, err := cid.Decode(c.CID) + if err != nil { + return nil, err + } + headerCids = append(headerCids, dc) + } + headers := f.fetchBatch(headerCids) + if len(headers) != len(headerCids) { + log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) + return headers, errUnexpectedNumberOfIPLDs + } + return headers, nil +} + +// FetchTrxs fetches transactions +// It uses the f.fetchBatch method +func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([]blocks.Block, error) { + log.Debug("fetching transaction iplds") + trxCids := make([]cid.Cid, 0, len(cids)) + for _, c := range cids { + dc, err := cid.Decode(c.CID) + if err != nil { + return nil, err + } + trxCids = append(trxCids, dc) + } + trxs := f.fetchBatch(trxCids) + if len(trxs) != len(trxCids) { + log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) + return trxs, errUnexpectedNumberOfIPLDs + } + return trxs, nil +} + +// fetch is used to fetch a single cid +func (f *IPLDFetcher) fetch(cid cid.Cid) (blocks.Block, error) { + return f.BlockService.GetBlock(context.Background(), cid) +} + +// fetchBatch is used to fetch a batch of IPFS data blocks by cid +// There is no guarantee all are fetched, and no error in such a case, so +// downstream we will need to confirm which CIDs were fetched in the result set +func (f *IPLDFetcher) fetchBatch(cids []cid.Cid) []blocks.Block { + fetchedBlocks := make([]blocks.Block, 0, len(cids)) + blockChan := f.BlockService.GetBlocks(context.Background(), cids) + for block := range blockChan { + fetchedBlocks = append(fetchedBlocks, block) + } + return fetchedBlocks +} diff --git a/pkg/super_node/btc/payload_fetcher.go b/pkg/super_node/btc/payload_fetcher.go new file mode 100644 index 00000000..e778e843 --- /dev/null +++ b/pkg/super_node/btc/payload_fetcher.go @@ -0,0 +1,70 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package btc + +import ( + "github.com/btcsuite/btcd/rpcclient" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// PayloadFetcher satisfies the PayloadFetcher interface for bitcoin +type PayloadFetcher struct { + // PayloadFetcher is thread-safe as long as the underlying client is thread-safe, since it has/modifies no other state + // http.Client is thread-safe + client *rpcclient.Client +} + +// NewStateDiffFetcher returns a PayloadFetcher +func NewPayloadFetcher(c *rpcclient.Client) *PayloadFetcher { + return &PayloadFetcher{ + client: c, + } +} + +// FetchAt fetches the block payloads at the given block heights +func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { + blockPayloads := make([]shared.RawChainData, len(blockHeights)) + for i, height := range blockHeights { + hash, err := fetcher.client.GetBlockHash(int64(height)) + if err != nil { + return nil, err + } + block, err := fetcher.client.GetBlock(hash) + if err != nil { + return nil, err + } + blockPayloads[i] = BlockPayload{ + Height: int32(height), + Header: &block.Header, + Txs: msgTxsToUtilTxs(block.Transactions), + } + } + return blockPayloads, nil +} + +func msgTxsToUtilTxs(msgs []*wire.MsgTx) []*btcutil.Tx { + txs := make([]*btcutil.Tx, len(msgs)) + for i, msg := range msgs { + tx := btcutil.NewTx(msg) + tx.SetIndex(i) + txs[i] = tx + } + return txs +} diff --git a/pkg/super_node/btc/resolver.go b/pkg/super_node/btc/resolver.go index 8dd3c1ae..e7788362 100644 --- a/pkg/super_node/btc/resolver.go +++ b/pkg/super_node/btc/resolver.go @@ -15,3 +15,40 @@ // along with this program. If not, see . package btc + +import ( + "fmt" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + + "github.com/ipfs/go-block-format" +) + +// IPLDResolver satisfies the IPLDResolver interface for bitcoin +type IPLDResolver struct{} + +// NewIPLDResolver returns a pointer to an IPLDResolver which satisfies the IPLDResolver interface +func NewIPLDResolver() *IPLDResolver { + return &IPLDResolver{} +} + +// Resolve is the exported method for resolving all of the BTC IPLDs packaged in an IpfsBlockWrapper +func (eir *IPLDResolver) Resolve(iplds shared.FetchedIPLDs) (shared.ServerResponse, error) { + ipfsBlocks, ok := iplds.(*IPLDWrapper) + if !ok { + return StreamResponse{}, fmt.Errorf("eth resolver expected iplds type %T got %T", &IPLDWrapper{}, iplds) + } + return StreamResponse{ + BlockNumber: ipfsBlocks.BlockNumber, + SerializedHeaders: eir.resolve(ipfsBlocks.Headers), + SerializedTxs: eir.resolve(ipfsBlocks.Transactions), + }, nil +} + +func (eir *IPLDResolver) resolve(iplds []blocks.Block) [][]byte { + rlps := make([][]byte, 0, len(iplds)) + for _, ipld := range iplds { + rlps = append(rlps, ipld.RawData()) + } + return rlps +}