index all cids in a payload in a single atomic tx; misc fixes; comment additions

This commit is contained in:
Ian Norden 2019-04-17 13:26:48 -05:00
parent 79efaeb089
commit 31a9017c4f
8 changed files with 167 additions and 88 deletions

View File

@ -20,7 +20,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
@ -31,20 +30,19 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/geth/node" "github.com/vulcanize/vulcanizedb/pkg/geth/node"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/utils" "github.com/vulcanize/vulcanizedb/utils"
"github.com/vulcanize/vulcanizedb/pkg/core"
) )
// syncAndPublishCmd represents the syncAndPublish command // syncAndPublishCmd represents the syncAndPublish command
var syncAndPublishCmd = &cobra.Command{ var syncAndPublishCmd = &cobra.Command{
Use: "syncAndPublish", Use: "syncAndPublish",
Short: "A brief description of your command", Short: "Syncs all Ethereum data into IPFS, indexing the CIDs",
Long: `A longer description that spans multiple lines and likely contains examples Long: `This command works alongside a modified geth node which streams
and usage of using your command. For example: all block and state (diff) data over a websocket subscription. This process
then converts the eth objects to IPLDs and publishes them IPFS. Additionally,
Cobra is a CLI library for Go that empowers applications. it maintains a local index of the IPLD objects' CIDs in Postgres.`,
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
syncAndPulish() syncAndPublish()
}, },
} }
@ -54,10 +52,10 @@ var (
func init() { func init() {
rootCmd.AddCommand(syncAndPublishCmd) rootCmd.AddCommand(syncAndPublishCmd)
syncAndPublishCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "", "Path for configuring IPFS node") syncAndPublishCmd.Flags().StringVarP(&ipfsPath, "ipfs-path", "i", "~/.ipfs", "Path for configuring IPFS node")
} }
func syncAndPulish() { func syncAndPublish() {
blockChain, ethClient, rpcClient := getBlockChainAndClients() blockChain, ethClient, rpcClient := getBlockChainAndClients()
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())
@ -68,8 +66,11 @@ func syncAndPulish() {
} }
wg := syn.WaitGroup{} wg := syn.WaitGroup{}
indexer.Index(wg) err = indexer.Index(wg)
wg.Wait() if err != nil {
log.Fatal(err)
}
wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through
} }
func getBlockChainAndClients() (*geth.BlockChain, core.EthClient, core.RpcClient) { func getBlockChainAndClients() (*geth.BlockChain, core.EthClient, core.RpcClient) {

View File

@ -18,10 +18,10 @@ package eth
import ( import (
"errors" "errors"
"github.com/ethereum/go-ethereum"
"math/big" "math/big"
"strconv" "strconv"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"

View File

@ -17,6 +17,7 @@
package client package client
import ( import (
"errors"
"context" "context"
"errors" "errors"
"reflect" "reflect"

View File

@ -17,23 +17,27 @@
package ipfs package ipfs
import ( import (
log "github.com/sirupsen/logrus"
"sync" "sync"
log "github.com/sirupsen/logrus"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
const payloadChanBufferSize = 800 // 1/10th max size const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size
// Indexer is an interface for streaming, converting to IPLDs, publishing, and indexing all Ethereum data
// This is the top-level interface used by the syncAndPublish command
type Indexer interface { type Indexer interface {
Index() error Index(wg sync.WaitGroup) error
} }
type IPFSIndexer struct { // ipfsIndexer is the underlying struct for the Indexer interface
Syncer Syncer // we are not exporting this to enforce proper initialization through the NewIPFSIndexer function
type ipfsIndexer struct {
Streamer Streamer
Converter Converter Converter Converter
Publisher Publisher Publisher Publisher
Repository Repository Repository Repository
@ -41,13 +45,14 @@ type IPFSIndexer struct {
QuitChan chan bool QuitChan chan bool
} }
func NewIPFSIndexer(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (*IPFSIndexer, error) { // NewIPFSIndexer creates a new Indexer interface using an underlying ipfsIndexer struct
func NewIPFSIndexer(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (Indexer, error) {
publisher, err := NewIPLDPublisher(ipfsPath) publisher, err := NewIPLDPublisher(ipfsPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &IPFSIndexer{ return &ipfsIndexer{
Syncer: NewStateDiffSyncer(rpcClient), Streamer: NewStateDiffSyncer(rpcClient),
Repository: NewCIDRepository(db), Repository: NewCIDRepository(db),
Converter: NewPayloadConverter(ethClient), Converter: NewPayloadConverter(ethClient),
Publisher: publisher, Publisher: publisher,
@ -56,9 +61,9 @@ func NewIPFSIndexer(ipfsPath string, db *postgres.DB, ethClient core.EthClient,
}, nil }, nil
} }
// The main processing loop for the syncAndPublish // Index is the main processing loop
func (i *IPFSIndexer) Index(wg sync.WaitGroup) error { func (i *ipfsIndexer) Index(wg sync.WaitGroup) error {
sub, err := i.Syncer.Sync(i.PayloadChan) sub, err := i.Streamer.Stream(i.PayloadChan)
if err != nil { if err != nil {
return err return err
} }
@ -76,7 +81,7 @@ func (i *IPFSIndexer) Index(wg sync.WaitGroup) error {
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
err = i.Repository.IndexCIDs(cidPayload) err = i.Repository.Index(cidPayload)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }

View File

@ -28,14 +28,17 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
) )
// Converter interface is used to convert a geth statediff.Payload to our IPLDPayload type
type Converter interface { type Converter interface {
Convert(payload statediff.Payload) (*IPLDPayload, error) Convert(payload statediff.Payload) (*IPLDPayload, error)
} }
// PayloadConverter is the underlying struct for the Converter interface
type PayloadConverter struct { type PayloadConverter struct {
client core.EthClient client core.EthClient
} }
// IPLDPayload is a custom type which packages ETH data for the IPFS publisher
type IPLDPayload struct { type IPLDPayload struct {
HeaderRLP []byte HeaderRLP []byte
BlockNumber *big.Int BlockNumber *big.Int
@ -46,12 +49,14 @@ type IPLDPayload struct {
StorageLeafs map[common.Hash]map[common.Hash][]byte StorageLeafs map[common.Hash]map[common.Hash][]byte
} }
// NewPayloadConverter creates a pointer to a new PayloadConverter which satisfies the Converter interface
func NewPayloadConverter(client core.EthClient) *PayloadConverter { func NewPayloadConverter(client core.EthClient) *PayloadConverter {
return &PayloadConverter{ return &PayloadConverter{
client: client, client: client,
} }
} }
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *PayloadConverter) Convert(payload statediff.Payload) (*IPLDPayload, error) { func (pc *PayloadConverter) Convert(payload statediff.Payload) (*IPLDPayload, error) {
// Unpack block rlp to access fields // Unpack block rlp to access fields
block := new(types.Block) block := new(types.Block)

View File

@ -19,6 +19,7 @@ package ipfs
import ( import (
"errors" "errors"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/eth-block-extractor/pkg/ipfs" "github.com/vulcanize/eth-block-extractor/pkg/ipfs"
@ -30,10 +31,12 @@ import (
"github.com/vulcanize/eth-block-extractor/pkg/wrappers/rlp" "github.com/vulcanize/eth-block-extractor/pkg/wrappers/rlp"
) )
// Publisher is the interface for publishing an IPLD payload
type Publisher interface { type Publisher interface {
Publish(payload *IPLDPayload) (*CIDPayload, error) Publish(payload *IPLDPayload) (*CIDPayload, error)
} }
// IPLDPublisher is the underlying struct for the Publisher interface
type IPLDPublisher struct { type IPLDPublisher struct {
Node *ipfs.IPFS Node *ipfs.IPFS
HeaderPutter *eth_block_header.BlockHeaderDagPutter HeaderPutter *eth_block_header.BlockHeaderDagPutter
@ -43,6 +46,7 @@ type IPLDPublisher struct {
StoragePutter *eth_storage_trie.StorageTrieDagPutter StoragePutter *eth_storage_trie.StorageTrieDagPutter
} }
// CID payload is a struct to hold all the CIDs and their meta data
type CIDPayload struct { type CIDPayload struct {
BlockNumber string BlockNumber string
BlockHash string BlockHash string
@ -53,6 +57,7 @@ type CIDPayload struct {
StorageLeafCIDs map[common.Hash]map[common.Hash]string StorageLeafCIDs map[common.Hash]map[common.Hash]string
} }
// NewIPLDPublisher creates a pointer to a new IPLDPublisher which satisfies the Publisher interface
func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) {
node, err := ipfs.InitIPFSNode(ipfsPath) node, err := ipfs.InitIPFSNode(ipfsPath)
if err != nil { if err != nil {
@ -69,45 +74,94 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) {
}, nil }, nil
} }
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload *IPLDPayload) (*CIDPayload, error) { func (pub *IPLDPublisher) Publish(payload *IPLDPayload) (*CIDPayload, error) {
// Process and publish headers // Process and publish headers
headerCids, err := pub.HeaderPutter.DagPut(payload.HeaderRLP) headerCid, err := pub.publishHeaders(payload.HeaderRLP)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(headerCids) != 1 {
return nil, errors.New("single CID expected to be returned for header")
}
// Process and publish transactions // Process and publish transactions
transactionCids, err := pub.TransactionPutter.DagPut(payload.BlockBody) transactionCids, err := pub.publishTransactions(payload.BlockBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(transactionCids) != len(payload.BlockBody.Transactions) {
return nil, errors.New("expected one CID for each transaction")
}
trxCids := make(map[common.Hash]string, len(transactionCids))
for i, trx := range payload.BlockBody.Transactions {
trxCids[trx.Hash()] = transactionCids[i]
}
// Process and publish receipts // Process and publish receipts
receiptsCids, err := pub.ReceiptPutter.DagPut(payload.Receipts) receiptsCids, err := pub.publishReceipts(payload.Receipts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(receiptsCids) != len(payload.Receipts) {
return nil, errors.New("expected one CID for each receipt")
}
rctCids := make(map[common.Hash]string, len(receiptsCids))
for i, rct := range payload.Receipts {
rctCids[rct.TxHash] = receiptsCids[i]
}
// Process and publish state leafs // Process and publish state leafs
stateLeafCids, err := pub.publishStateLeafs(payload.StateLeafs)
if err != nil {
return nil, err
}
// Process and publish storage leafs
storageLeafCids, err := pub.publishStorageLeafs(payload.StorageLeafs)
if err != nil {
return nil, err
}
// Package CIDs into a single struct
return &CIDPayload{
BlockHash: payload.BlockHash.Hex(),
BlockNumber: payload.BlockNumber.String(),
HeaderCID: headerCid,
TransactionCIDs: transactionCids,
ReceiptCIDs: receiptsCids,
StateLeafCIDs: stateLeafCids,
StorageLeafCIDs: storageLeafCids,
}, nil
}
func (pub *IPLDPublisher) publishHeaders(headerRLP []byte) (string, error) {
headerCids, err := pub.HeaderPutter.DagPut(headerRLP)
if err != nil {
return "", err
}
if len(headerCids) != 1 {
return "", errors.New("single CID expected to be returned for header")
}
return headerCids[0], nil
}
func (pub *IPLDPublisher) publishTransactions(blockBody *types.Body) (map[common.Hash]string, error) {
transactionCids, err := pub.TransactionPutter.DagPut(blockBody)
if err != nil {
return nil, err
}
if len(transactionCids) != len(blockBody.Transactions) {
return nil, errors.New("expected one CID for each transaction")
}
mappedTrxCids := make(map[common.Hash]string, len(transactionCids))
for i, trx := range blockBody.Transactions {
mappedTrxCids[trx.Hash()] = transactionCids[i]
}
return mappedTrxCids, nil
}
func (pub *IPLDPublisher) publishReceipts(receipts types.Receipts) (map[common.Hash]string, error) {
receiptsCids, err := pub.ReceiptPutter.DagPut(receipts)
if err != nil {
return nil, err
}
if len(receiptsCids) != len(receipts) {
return nil, errors.New("expected one CID for each receipt")
}
mappedRctCids := make(map[common.Hash]string, len(receiptsCids))
for i, rct := range receipts {
mappedRctCids[rct.TxHash] = receiptsCids[i]
}
return mappedRctCids, nil
}
func (pub *IPLDPublisher) publishStateLeafs(stateLeafs map[common.Hash][]byte) (map[common.Hash]string, error) {
stateLeafCids := make(map[common.Hash]string) stateLeafCids := make(map[common.Hash]string)
for addr, leaf := range payload.StateLeafs { for addr, leaf := range stateLeafs {
stateLeafCid, err := pub.StatePutter.DagPut(leaf) stateLeafCid, err := pub.StatePutter.DagPut(leaf)
if err != nil { if err != nil {
return nil, err return nil, err
@ -117,10 +171,12 @@ func (pub *IPLDPublisher) Publish(payload *IPLDPayload) (*CIDPayload, error) {
} }
stateLeafCids[addr] = stateLeafCid[0] stateLeafCids[addr] = stateLeafCid[0]
} }
return stateLeafCids, nil
}
// Process and publish storage leafs func (pub *IPLDPublisher) publishStorageLeafs(storageLeafs map[common.Hash]map[common.Hash][]byte) (map[common.Hash]map[common.Hash]string, error) {
storageLeafCids := make(map[common.Hash]map[common.Hash]string) storageLeafCids := make(map[common.Hash]map[common.Hash]string)
for addr, storageTrie := range payload.StorageLeafs { for addr, storageTrie := range storageLeafs {
storageLeafCids[addr] = make(map[common.Hash]string) storageLeafCids[addr] = make(map[common.Hash]string)
for key, leaf := range storageTrie { for key, leaf := range storageTrie {
storageLeafCid, err := pub.StoragePutter.DagPut(leaf) storageLeafCid, err := pub.StoragePutter.DagPut(leaf)
@ -133,14 +189,5 @@ func (pub *IPLDPublisher) Publish(payload *IPLDPayload) (*CIDPayload, error) {
storageLeafCids[addr][key] = storageLeafCid[0] storageLeafCids[addr][key] = storageLeafCid[0]
} }
} }
return storageLeafCids, nil
return &CIDPayload{
BlockHash: payload.BlockHash.Hex(),
BlockNumber: payload.BlockNumber.String(),
HeaderCID: headerCids[0],
TransactionCIDs: trxCids,
ReceiptCIDs: rctCids,
StateLeafCIDs: stateLeafCids,
StorageLeafCIDs: storageLeafCids,
}, nil
} }

View File

@ -17,56 +17,72 @@
package ipfs package ipfs
import ( import (
"github.com/jmoiron/sqlx"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
// Repository is an interface for indexing CIDPayloads
type Repository interface { type Repository interface {
IndexCIDs(cidPayload *CIDPayload) error Index(cidPayload *CIDPayload) error
} }
// CIDRepository is the underlying struct for the Repository interface
type CIDRepository struct { type CIDRepository struct {
db *postgres.DB db *postgres.DB
} }
// NewCIDRepository creates a new pointer to a CIDRepository which satisfies the Repository interface
func NewCIDRepository(db *postgres.DB) *CIDRepository { func NewCIDRepository(db *postgres.DB) *CIDRepository {
return &CIDRepository{ return &CIDRepository{
db: db, db: db,
} }
} }
func (repo *CIDRepository) IndexCIDs(cidPayload *CIDPayload) error { // IndexCIDs indexes a cidPayload in Postgres
headerID, err := repo.indexHeaderCID(cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash) func (repo *CIDRepository) Index(cidPayload *CIDPayload) error {
tx, _ := repo.db.Beginx()
headerID, err := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash)
if err != nil { if err != nil {
tx.Rollback()
return err return err
} }
err = repo.indexTransactionAndReceiptCIDs(cidPayload, headerID) err = repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID)
if err != nil { if err != nil {
tx.Rollback()
return err return err
} }
err = repo.indexStateAndStorageCIDs(tx, cidPayload, headerID)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
} }
func (repo *CIDRepository) indexHeaderCID(cid, blockNumber, hash string) (int64, error) { func (repo *CIDRepository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) {
var headerID int64 var headerID int64
err := repo.db.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid) VALUES ($1, $2, $3) err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid) VALUES ($1, $2, $3)
RETURNING id ON CONFLICT DO UPDATE SET cid = $3
ON CONFLICT DO UPDATE SET cid = $3`, blockNumber, hash, cid).Scan(&headerID) RETURNING id`,
blockNumber, hash, cid).Scan(&headerID)
return headerID, err return headerID, err
} }
func (repo *CIDRepository) indexTransactionAndReceiptCIDs(payload *CIDPayload, headerID int64) error { func (repo *CIDRepository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error {
for hash, trxCid := range payload.TransactionCIDs { for hash, trxCid := range payload.TransactionCIDs {
var txID int64 var txID int64
err := repo.db.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid) VALUES ($1, $2, $3) err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid) VALUES ($1, $2, $3)
RETURNING id ON CONFLICT DO UPDATE SET cid = $3
ON CONFLICT DO UPDATE SET cid = $3`, headerID, hash.Hex(), trxCid).Scan(&txID) RETURNING id`,
headerID, hash.Hex(), trxCid).Scan(&txID)
if err != nil { if err != nil {
return err return err
} }
receiptCid, ok := payload.ReceiptCIDs[hash] receiptCid, ok := payload.ReceiptCIDs[hash]
if ok { if ok {
err = repo.indexReceiptCID(receiptCid, txID) err = repo.indexReceiptCID(tx, receiptCid, txID)
if err != nil { if err != nil {
return err return err
} }
@ -75,23 +91,24 @@ func (repo *CIDRepository) indexTransactionAndReceiptCIDs(payload *CIDPayload, h
return nil return nil
} }
func (repo *CIDRepository) indexReceiptCID(cid string, txId int64) error { func (repo *CIDRepository) indexReceiptCID(tx *sqlx.Tx, cid string, txId int64) error {
_, err := repo.db.Exec(`INSERT INTO public.receipt_cids (tx_id, cid) VALUES ($1, $2) _, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid) VALUES ($1, $2)
ON CONFLICT DO UPDATE SET cid = $2`, txId, cid) ON CONFLICT DO UPDATE SET cid = $2`, txId, cid)
return err return err
} }
func (repo *CIDRepository) indexStateAndStorageCIDs(payload *CIDPayload, headerID int64) error { func (repo *CIDRepository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error {
for accountKey, stateCID := range payload.StateLeafCIDs { for accountKey, stateCID := range payload.StateLeafCIDs {
var stateID int64 var stateID int64
err := repo.db.QueryRowx(`INSERT INTO public.state_cids (header_id, account_key, cid) VALUES ($1, $2, $3) err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, account_key, cid) VALUES ($1, $2, $3)
RETURNING id ON CONFLICT DO UPDATE SET cid = $3
ON CONFLICT DO UPDATE SET cid = $3`, headerID, accountKey.Hex(), stateCID).Scan(&stateID) RETURNING id`,
headerID, accountKey.Hex(), stateCID).Scan(&stateID)
if err != nil { if err != nil {
return err return err
} }
for storageKey, storageCID := range payload.StorageLeafCIDs[accountKey] { for storageKey, storageCID := range payload.StorageLeafCIDs[accountKey] {
err = repo.indexStorageCID(storageKey.Hex(), storageCID, stateID) err = repo.indexStorageCID(tx, storageKey.Hex(), storageCID, stateID)
if err != nil { if err != nil {
return err return err
} }
@ -100,7 +117,7 @@ func (repo *CIDRepository) indexStateAndStorageCIDs(payload *CIDPayload, headerI
return nil return nil
} }
func (repo *CIDRepository) indexStorageCID(key, cid string, stateId int64) error { func (repo *CIDRepository) indexStorageCID(tx *sqlx.Tx, key, cid string, stateId int64) error {
_, err := repo.db.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid) VALUES ($1, $2, $3) _, err := repo.db.Exec(`INSERT INTO public.storage_cids (state_id, storage_key, cid) VALUES ($1, $2, $3)
ON CONFLICT DO UPDATE SET cid = $3`, stateId, key, cid) ON CONFLICT DO UPDATE SET cid = $3`, stateId, key, cid)
return err return err

View File

@ -23,22 +23,25 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
) )
type Syncer interface { // Streamer is the interface for streaming a statediff subscription
Sync(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) type Streamer interface {
Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error)
} }
type StateDiffSyncer struct { // StateDiffStreamer is the underlying struct for the Streamer interface
type StateDiffStreamer struct {
Client core.RpcClient Client core.RpcClient
PayloadChan chan statediff.Payload PayloadChan chan statediff.Payload
} }
func NewStateDiffSyncer(client core.RpcClient) *StateDiffSyncer { // NewStateDiffStreamer creates a pointer to a new StateDiffStreamer which satisfies the Streamer interface
return &StateDiffSyncer{ func NewStateDiffSyncer(client core.RpcClient) *StateDiffStreamer {
return &StateDiffStreamer{
Client: client, Client: client,
} }
} }
// Sync is the main loop for subscribing to data from the Geth state diff process // Stream is the main loop for subscribing to data from the Geth state diff process
func (i *StateDiffSyncer) Sync(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { func (i *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
return i.Client.Subscribe("statediff", i.PayloadChan) return i.Client.Subscribe("statediff", i.PayloadChan)
} }