finish wasm watcher engine

This commit is contained in:
Ian Norden 2020-02-27 15:07:33 -06:00
parent 330a083749
commit 8c2a71b16f
17 changed files with 355 additions and 87 deletions

View File

@ -133,14 +133,14 @@ func streamEthSubscription() {
// This assumes leafs only
for _, stateNode := range ethData.StateNodes {
var acct state.Account
err = rlp.Decode(bytes.NewBuffer(stateNode.IPLD.Data), &acct)
err = rlp.DecodeBytes(stateNode.IPLD.Data, &acct)
if err != nil {
logWithCommand.Error(err)
continue
}
fmt.Printf("Account for key %s, and root %s, with balance %d\n",
stateNode.StateTrieKey.Hex(), acct.Root.Hex(), acct.Balance.Int64())
fmt.Printf("state account: %v\n", acct)
fmt.Printf("Account for key %s, and root %s, with balance %s\n",
stateNode.StateTrieKey.Hex(), acct.Root.Hex(), acct.Balance.String())
fmt.Printf("state account: %+v\n", acct)
}
for _, storageNode := range ethData.StorageNodes {
fmt.Printf("Storage for state key %s ", storageNode.StateTrieKey.Hex())

View File

@ -76,10 +76,10 @@ func staticRewardByBlockNumber(blockNumber int64) *big.Int {
return staticBlockReward
}
func CalcEthBlockReward(block *types.Block, receipts types.Receipts) *big.Int {
staticBlockReward := staticRewardByBlockNumber(block.Number().Int64())
transactionFees := calcEthTransactionFees(block, receipts)
uncleInclusionRewards := calcEthUncleInclusionRewards(block, block.Uncles())
func CalcEthBlockReward(header *types.Header, uncles []*types.Header, txs types.Transactions, receipts types.Receipts) *big.Int {
staticBlockReward := staticRewardByBlockNumber(header.Number.Int64())
transactionFees := calcEthTransactionFees(txs, receipts)
uncleInclusionRewards := calcEthUncleInclusionRewards(header, uncles)
tmp := transactionFees.Add(transactionFees, uncleInclusionRewards)
return tmp.Add(tmp, staticBlockReward)
}
@ -94,9 +94,9 @@ func CalcUncleMinerReward(blockNumber, uncleBlockNumber int64) *big.Int {
return rewardDiv8.Mul(rewardDiv8, uncleBlockPlus8MinusMainBlock)
}
func calcEthTransactionFees(block *types.Block, receipts types.Receipts) *big.Int {
func calcEthTransactionFees(txs types.Transactions, receipts types.Receipts) *big.Int {
transactionFees := new(big.Int)
for i, transaction := range block.Transactions() {
for i, transaction := range txs {
receipt := receipts[i]
gasPrice := big.NewInt(transaction.GasPrice().Int64())
gasUsed := big.NewInt(int64(receipt.GasUsed))
@ -106,10 +106,10 @@ func calcEthTransactionFees(block *types.Block, receipts types.Receipts) *big.In
return transactionFees
}
func calcEthUncleInclusionRewards(block *types.Block, uncles []*types.Header) *big.Int {
func calcEthUncleInclusionRewards(header *types.Header, uncles []*types.Header) *big.Int {
uncleInclusionRewards := new(big.Int)
for range uncles {
staticBlockReward := staticRewardByBlockNumber(block.Number().Int64())
staticBlockReward := staticRewardByBlockNumber(header.Number.Int64())
staticBlockReward.Div(staticBlockReward, big.NewInt(32))
uncleInclusionRewards.Add(uncleInclusionRewards, staticBlockReward)
}

View File

@ -41,8 +41,8 @@ type ConvertedPayload struct {
}
// Height satisfies the StreamedIPLDs interface
func (i ConvertedPayload) Height() int64 {
return i.BlockPayload.BlockHeight
func (cp ConvertedPayload) Height() int64 {
return cp.BlockPayload.BlockHeight
}
// CIDPayload is a struct to hold all the CIDs and their associated meta data for indexing in Postgres

View File

@ -71,8 +71,8 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
return nil, err
}
txMeta := TxModel{
Dst: handleNullAddr(trx.To()),
Src: handleNullAddr(&from),
Dst: shared.HandleNullAddr(trx.To()),
Src: shared.HandleNullAddr(&from),
TxHash: trx.Hash().String(),
Index: int64(i),
}
@ -169,10 +169,3 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
}
return convertedPayload, nil
}
func handleNullAddr(to *common.Address) string {
if to == nil {
return "0x0000000000000000000000000000000000000000000000000000000000000000"
}
return to.Hex()
}

View File

@ -51,6 +51,7 @@ func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload sh
}
if checkRange(ethFilters.Start.Int64(), ethFilters.End.Int64(), ethPayload.Block.Number().Int64()) {
response := new(IPLDs)
response.TotalDifficulty = ethPayload.TotalDifficulty
if err := s.filterHeaders(ethFilters.HeaderFilter, response, ethPayload); err != nil {
return IPLDs{}, err
}

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-block-format"
@ -58,9 +59,13 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) {
return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
}
log.Debug("fetching iplds")
iplds := IPLDs{}
iplds.BlockNumber = cidWrapper.BlockNumber
var err error
iplds := IPLDs{}
iplds.TotalDifficulty, ok = new(big.Int).SetString(cidWrapper.Header.TotalDifficulty, 10)
if !ok {
return nil, errors.New("eth fetcher: unable to set total difficulty")
}
iplds.BlockNumber = cidWrapper.BlockNumber
iplds.Header, err = f.FetchHeader(cidWrapper.Header)
if err != nil {
return nil, err

View File

@ -51,7 +51,8 @@ var (
mockCIDWrapper = &eth.CIDWrapper{
BlockNumber: big.NewInt(9000),
Header: eth.HeaderModel{
CID: mockHeaderBlock.Cid().String(),
TotalDifficulty: "1337",
CID: mockHeaderBlock.Cid().String(),
},
Uncles: []eth.UncleModel{
{
@ -104,6 +105,7 @@ var _ = Describe("Fetcher", func() {
Expect(err).ToNot(HaveOccurred())
iplds, ok := i.(eth.IPLDs)
Expect(ok).To(BeTrue())
Expect(iplds.TotalDifficulty).To(Equal(big.NewInt(1337)))
Expect(iplds.BlockNumber).To(Equal(mockCIDWrapper.BlockNumber))
Expect(iplds.Header).To(Equal(ipfs.BlockModel{
Data: mockHeaderBlock.RawData(),

View File

@ -64,7 +64,7 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
if err != nil {
return nil, err
}
reward := common2.CalcEthBlockReward(ipldPayload.Block, ipldPayload.Receipts)
reward := common2.CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts)
header := HeaderModel{
CID: headerCid,
ParentHash: ipldPayload.Block.ParentHash().String(),

View File

@ -78,13 +78,14 @@ type CIDWrapper struct {
// IPLDs is used to package raw IPLD block data fetched from IPFS and returned by the server
// Returned by IPLDFetcher and ResponseFilterer
type IPLDs struct {
BlockNumber *big.Int
Header ipfs.BlockModel
Uncles []ipfs.BlockModel
Transactions []ipfs.BlockModel
Receipts []ipfs.BlockModel
StateNodes []StateNode
StorageNodes []StorageNode
BlockNumber *big.Int
TotalDifficulty *big.Int
Header ipfs.BlockModel
Uncles []ipfs.BlockModel
Transactions []ipfs.BlockModel
Receipts []ipfs.BlockModel
StateNodes []StateNode
StorageNodes []StorageNode
}
// Height satisfies the StreamedIPLDs interface

View File

@ -19,6 +19,8 @@ package shared
import (
"bytes"
"github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
)
@ -51,3 +53,11 @@ func ListContainsGap(gapList []Gap, gap Gap) bool {
}
return false
}
// HandleNullAddr converts a nil pointer to an address to a zero-valued hex address string
func HandleNullAddr(to *common.Address) string {
if to == nil {
return "0x0000000000000000000000000000000000000000000000000000000000000000"
}
return to.Hex()
}

View File

@ -41,6 +41,7 @@ func NewWASMInstantiator(db *postgres.DB, instances []WasmFunction) *Instantiato
// Instantiate is used to load the WASM functions into Postgres
func (i *Instantiator) Instantiate() error {
// TODO: enable instantiation of WASM functions from IPFS
tx, err := i.db.Beginx()
if err != nil {
return err

View File

@ -0,0 +1 @@
package btc

View File

@ -0,0 +1,166 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// WatcherConverter converts watched data into models for the trigger tables
type WatcherConverter struct {
chainConfig *params.ChainConfig
}
// NewWatcherConverter creates a pointer to a new WatcherConverter
func NewWatcherConverter(chainConfig *params.ChainConfig) *WatcherConverter {
return &WatcherConverter{
chainConfig: chainConfig,
}
}
// Convert method is used to convert eth iplds to an cid payload
// Satisfies the shared.PayloadConverter interface
func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error) {
numTxs := len(ethIPLDs.Transactions)
numRcts := len(ethIPLDs.Receipts)
if numTxs != numRcts {
return nil, fmt.Errorf("eth converter needs same numbe of receipts and transactions, have %d transactions and %d receipts", numTxs, numRcts)
}
// Initialize the payload struct and its fields
cids := new(eth.CIDPayload)
cids.UncleCIDs = make([]eth.UncleModel, len(ethIPLDs.Uncles))
cids.TransactionCIDs = make([]eth.TxModel, numTxs)
cids.ReceiptCIDs = make(map[common.Hash]eth.ReceiptModel, numTxs)
cids.StateNodeCIDs = make([]eth.StateNodeModel, len(ethIPLDs.StateNodes))
cids.StorageNodeCIDs = make(map[common.Hash][]eth.StorageNodeModel, len(ethIPLDs.StateNodes))
// Unpack header
var header types.Header
if err := rlp.DecodeBytes(ethIPLDs.Header.Data, &header); err != nil {
return nil, err
}
// Collect uncles so we can derive miner reward
uncles := make([]*types.Header, len(ethIPLDs.Uncles))
for i, uncleIPLD := range ethIPLDs.Uncles {
var uncle types.Header
if err := rlp.DecodeBytes(uncleIPLD.Data, &uncle); err != nil {
return nil, err
}
uncleReward := common2.CalcUncleMinerReward(header.Number.Int64(), uncle.Number.Int64())
uncles[i] = &uncle
// Uncle data
cids.UncleCIDs[i] = eth.UncleModel{
CID: uncleIPLD.CID,
BlockHash: uncle.Hash().String(),
ParentHash: uncle.ParentHash.String(),
Reward: uncleReward.String(),
}
}
// Collect transactions so we can derive receipt fields and miner reward
signer := types.MakeSigner(pc.chainConfig, header.Number)
transactions := make(types.Transactions, len(ethIPLDs.Transactions))
for i, txIPLD := range ethIPLDs.Transactions {
var tx types.Transaction
if err := rlp.DecodeBytes(txIPLD.Data, &tx); err != nil {
return nil, err
}
transactions[i] = &tx
from, err := types.Sender(signer, &tx)
if err != nil {
return nil, err
}
// Tx data
cids.TransactionCIDs[i] = eth.TxModel{
Dst: shared.HandleNullAddr(tx.To()),
Src: shared.HandleNullAddr(&from),
TxHash: tx.Hash().String(),
Index: int64(i),
CID: txIPLD.CID,
}
}
// Collect receipts so that we can derive the rest of their fields and miner reward
receipts := make(types.Receipts, len(ethIPLDs.Receipts))
for i, rctIPLD := range ethIPLDs.Receipts {
var rct types.Receipt
if err := rlp.DecodeBytes(rctIPLD.Data, &rct); err != nil {
return nil, err
}
receipts[i] = &rct
}
if err := receipts.DeriveFields(pc.chainConfig, header.Hash(), header.Number.Uint64(), transactions); err != nil {
return nil, err
}
for i, receipt := range receipts {
matchedTx := transactions[i]
if matchedTx.To() != nil {
receipt.ContractAddress = *transactions[i].To()
}
topicSets := make([][]string, 4)
for _, log := range receipt.Logs {
for i := range topicSets {
if i < len(log.Topics) {
topicSets[i] = append(topicSets[i], log.Topics[i].Hex())
}
}
}
// Rct data
cids.ReceiptCIDs[matchedTx.Hash()] = eth.ReceiptModel{
CID: ethIPLDs.Receipts[i].CID,
Topic0s: topicSets[0],
Topic1s: topicSets[1],
Topic2s: topicSets[2],
Topic3s: topicSets[3],
Contract: receipt.ContractAddress.Hex(),
}
}
minerReward := common2.CalcEthBlockReward(&header, uncles, transactions, receipts)
// Header data
cids.HeaderCID = eth.HeaderModel{
CID: ethIPLDs.Header.CID,
ParentHash: header.ParentHash.String(),
BlockHash: header.Hash().String(),
BlockNumber: header.Number.String(),
TotalDifficulty: ethIPLDs.TotalDifficulty.String(),
Reward: minerReward.String(),
}
// State data
for i, stateIPLD := range ethIPLDs.StateNodes {
cids.StateNodeCIDs[i] = eth.StateNodeModel{
CID: stateIPLD.IPLD.CID,
Leaf: stateIPLD.Leaf,
StateKey: stateIPLD.StateTrieKey.String(),
}
}
// Storage data
for _, storageIPLD := range ethIPLDs.StorageNodes {
cids.StorageNodeCIDs[storageIPLD.StateTrieKey] = append(cids.StorageNodeCIDs[storageIPLD.StateTrieKey], eth.StorageNodeModel{
CID: storageIPLD.IPLD.CID,
Leaf: storageIPLD.Leaf,
StorageKey: storageIPLD.StorageTrieKey.String(),
})
}
return cids, nil
}

View File

@ -17,20 +17,25 @@
package eth
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"io/ioutil"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/watcher/shared"
)
var (
vacuumThreshold int64 = 5000 // dont know how to decided what this should be set to
vacuumThreshold int64 = 5000
)
// Repository is the underlying struct for satisfying the shared.Repository interface for eth
type Repository struct {
cidIndexer *eth.CIDIndexer
converter *WatcherConverter
db *postgres.DB
triggerFunctions []string
deleteCalls int64
@ -39,6 +44,8 @@ type Repository struct {
// NewRepository returns a new eth.Repository that satisfies the shared.Repository interface
func NewRepository(db *postgres.DB, triggerFunctions []string) shared.Repository {
return &Repository{
cidIndexer: eth.NewCIDIndexer(db),
converter: NewWatcherConverter(params.MainnetChainConfig),
db: db,
triggerFunctions: triggerFunctions,
deleteCalls: 0,
@ -46,9 +53,25 @@ func NewRepository(db *postgres.DB, triggerFunctions []string) shared.Repository
}
// LoadTriggers is used to initialize Postgres trigger function
// this needs to be called after the wasm functions these triggers invoke have been instantiated
// this needs to be called after the wasm functions these triggers invoke have been instantiated in Postgres
func (r *Repository) LoadTriggers() error {
panic("implement me")
// TODO: enable loading of triggers from IPFS
tx, err := r.db.Beginx()
if err != nil {
return err
}
for _, funcPath := range r.triggerFunctions {
sqlFile, err := ioutil.ReadFile(funcPath)
if err != nil {
return err
}
sqlString := string(sqlFile)
if _, err := tx.Exec(sqlString); err != nil {
return err
}
}
return tx.Commit()
}
// QueueData puts super node payload data into the db queue
@ -59,14 +82,14 @@ func (r *Repository) QueueData(payload super_node.SubscriptionPayload) error {
return err
}
// GetQueueData grabs a chunk super node payload data from the queue table so that it can
// be forwarded to the ready table
// this is used to make sure we enter data into the ready table in sequential order
// GetQueueData grabs payload data from the queue table so that it can
// be forwarded to the ready tables
// this is used to make sure we enter data into the tables that triggers act on in sequential order
// even if we receive data out-of-order
// it returns the new index
// delete the data it retrieves so as to clear the queue
// periodically vacuum's the table to free up space from the deleted rows
func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload, int64, error) {
r.deleteCalls++
pgStr := `DELETE FROM eth.queued_data
WHERE height = $1
RETURNING *`
@ -74,13 +97,15 @@ func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload,
if err := r.db.Get(&res, pgStr, height); err != nil {
return super_node.SubscriptionPayload{}, height, err
}
// If the delete get query succeeded, increment deleteCalls and height and prep payload to return
r.deleteCalls++
height++
payload := super_node.SubscriptionPayload{
Data: res.Data,
Height: res.Height,
Flag: super_node.EmptyFlag,
}
height++
// Periodically clean up space in the queue table
// Periodically clean up space in the queued data table
if r.deleteCalls >= vacuumThreshold {
_, err := r.db.Exec(`VACUUM ANALYZE eth.queued_data`)
if err != nil {
@ -91,31 +116,77 @@ func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload,
return payload, height, nil
}
// ReadyData puts super node payload data in the tables ready for processing by trigger functions
// ReadyData puts data in the tables ready for processing by trigger functions
func (r *Repository) ReadyData(payload super_node.SubscriptionPayload) error {
panic("implement me")
var ethIPLDs eth.IPLDs
if err := rlp.DecodeBytes(payload.Data, &ethIPLDs); err != nil {
return err
}
if err := r.readyIPLDs(ethIPLDs); err != nil {
return err
}
cids, err := r.converter.Convert(ethIPLDs)
if err != nil {
return err
}
// Use indexer to persist all of the cid meta data
// trigger functions will act on these tables
return r.cidIndexer.Index(cids)
}
func (r *Repository) readyHeader(header *types.Header) error {
panic("implement me")
}
func (r *Repository) readyUncle(uncle *types.Header) error {
panic("implement me")
}
func (r *Repository) readyTxs(transactions types.Transactions) error {
panic("implement me")
}
func (r *Repository) readyRcts(receipts types.Receipts) error {
panic("implement me")
}
func (r *Repository) readyState(stateNodes map[common.Address][]byte) error {
panic("implement me")
}
func (r *Repository) readyStorage(storageNodes map[common.Address]map[common.Address][]byte) error {
panic("implement me")
// readyIPLDs adds IPLDs directly to the Postgres `blocks` table, rather than going through an IPFS node
func (r *Repository) readyIPLDs(ethIPLDs eth.IPLDs) error {
tx, err := r.db.Beginx()
if err != nil {
return err
}
pgStr := `INSERT INTO blocks (key, data) VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET (data) = ($2)`
if _, err := tx.Exec(pgStr, ethIPLDs.Header.CID, ethIPLDs.Header.Data); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err
}
for _, uncle := range ethIPLDs.Uncles {
if _, err := tx.Exec(pgStr, uncle.CID, uncle.Data); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err
}
}
for _, trx := range ethIPLDs.Transactions {
if _, err := tx.Exec(pgStr, trx.CID, trx.Data); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err
}
}
for _, rct := range ethIPLDs.Receipts {
if _, err := tx.Exec(pgStr, rct.CID, rct.Data); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err
}
}
for _, state := range ethIPLDs.StateNodes {
if _, err := tx.Exec(pgStr, state.IPLD.CID, state.IPLD.Data); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err
}
}
for _, storage := range ethIPLDs.StorageNodes {
if _, err := tx.Exec(pgStr, storage.IPLD.CID, storage.IPLD.Data); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err
}
}
return nil
}

View File

@ -0,0 +1,10 @@
CREATE TABLE eth.token_transfers (
id SERIAL PRIMARY KEY,
receipt_id INTEGER NOT NULL REFERENCES eth.receipt_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
log_index INTEGER NOT NULL,
contract_address VARCHAR(66) NOT NULL,
src VARCHAR(66) NOT NULL,
dst VARCHAR(66) NOT NULL,
amount NUMERIC NOT NULL,
UNIQUE (receipt_id, log_index)
);

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION transfer_trigger() RETURNS trigger AS
$BODY$
BEGIN
SELECT *
END;
$BODY$

View File

@ -53,10 +53,8 @@ type Service struct {
QuitChan chan bool
// Indexes
// use atomic operations on these ONLY
payloadIndex *int64
endingIndex *int64
backFilling *int32 // 0 => not backfilling; 1 => backfilling
endingIndex int64
}
// NewWatcher returns a new Service which satisfies the Watcher interface
@ -100,13 +98,7 @@ func (s *Service) Watch(wg *sync.WaitGroup) error {
return err
}
atomic.StoreInt64(s.payloadIndex, s.WatcherConfig.SubscriptionConfig.StartingBlock().Int64())
atomic.StoreInt64(s.endingIndex, s.WatcherConfig.SubscriptionConfig.EndingBlock().Int64()) // less than 0 => never end
backFilling := s.WatcherConfig.SubscriptionConfig.HistoricalData()
if backFilling {
atomic.StoreInt32(s.backFilling, 1)
} else {
atomic.StoreInt32(s.backFilling, 0)
}
s.endingIndex = s.WatcherConfig.SubscriptionConfig.EndingBlock().Int64() // less than 0 => never end
backFillOnly := s.WatcherConfig.SubscriptionConfig.HistoricalDataOnly()
if backFillOnly { // we are only processing historical data => handle single contiguous stream
s.backFillOnlyQueuing(wg, sub)
@ -123,7 +115,7 @@ func (s *Service) Watch(wg *sync.WaitGroup) error {
// NOTE: maybe we should push everything to the wait queue, otherwise the index could be shifted as we retrieve data from it
func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) {
wg.Add(1)
// this goroutine is responsible for allocating incoming data to the ready or wait queue
// This goroutine is responsible for allocating incoming data to the ready or wait queue
// depending on if it is at the current index or not
forwardQuit := make(chan bool)
go func() {
@ -141,8 +133,14 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio
if err := s.Repository.ReadyData(payload); err != nil {
logrus.Error(err)
}
atomic.AddInt64(s.payloadIndex, 1)
} else { // otherwise add it to the wait queue
// Increment the current index and if we have exceeded our ending height shut down the watcher
if atomic.AddInt64(s.payloadIndex, 1) > s.endingIndex {
logrus.Info("Watcher has reached ending block height, shutting down")
forwardQuit <- true
wg.Done()
return
}
} else { // Otherwise add it to the wait queue
if err := s.Repository.QueueData(payload); err != nil {
logrus.Error(err)
}
@ -158,27 +156,29 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio
}
}()
ticker := time.NewTicker(5 * time.Second)
// this goroutine is responsible for moving data from the wait queue to the ready queue
// This goroutine is responsible for moving data from the wait queue to the ready queue
// preserving the correct order and alignment with the current index
go func() {
for {
select {
case <-ticker.C:
// retrieve queued data, in order, and forward it to the ready queue
index := atomic.LoadInt64(s.payloadIndex)
queueData, newIndex, err := s.Repository.GetQueueData(index)
// Retrieve queued data, in order, and forward it to the ready queue
queueData, newIndex, err := s.Repository.GetQueueData(atomic.LoadInt64(s.payloadIndex))
if err != nil {
logrus.Error(err)
continue
}
atomic.StoreInt64(s.payloadIndex, newIndex)
if atomic.LoadInt64(s.payloadIndex) > s.endingIndex {
s.QuitChan <- true
}
if err := s.Repository.ReadyData(queueData); err != nil {
logrus.Error(err)
}
case <-forwardQuit:
return
default:
// do nothing, wait til next tick
// Do nothing, wait til next tick
}
}
}()