fixes after tests

This commit is contained in:
Ian Norden 2020-04-20 08:14:02 -05:00
parent eceaa0aecb
commit 43c254b5f6
16 changed files with 214 additions and 82 deletions

View File

@ -13,8 +13,8 @@
type = "state" # $RESYNC_TYPE type = "state" # $RESYNC_TYPE
start = 0 # $RESYNC_START start = 0 # $RESYNC_START
stop = 0 # $RESYNC_STOP stop = 0 # $RESYNC_STOP
batchSize = 10 # $RESYNC_BATCH_SIZE batchSize = 5 # $RESYNC_BATCH_SIZE
batchNumber = 100 # $RESYNC_BATCH_NUMBER batchNumber = 50 # $RESYNC_BATCH_NUMBER
timeout = 300 # $HTTP_TIMEOUT timeout = 300 # $HTTP_TIMEOUT
clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE
resetValidation = true # $RESYNC_RESET_VALIDATION resetValidation = true # $RESYNC_RESET_VALIDATION

View File

@ -21,6 +21,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/eth/client"
) )
@ -65,7 +67,7 @@ func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
} }
// BatchCallContext mockClient method to simulate batch call to geth // BatchCallContext mockClient method to simulate batch call to geth
func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []client.BatchElem) error { func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
if mc.MappedStateDiffAt == nil { if mc.MappedStateDiffAt == nil {
return errors.New("mockclient needs to be initialized with statediff payloads and errors") return errors.New("mockclient needs to be initialized with statediff payloads and errors")
} }

View File

@ -0,0 +1,44 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"context"
"github.com/ethereum/go-ethereum/rpc"
)
type StreamClient struct {
passedContext context.Context
passedResult interface{}
passedNamespace string
passedPayloadChan interface{}
passedSubscribeArgs []interface{}
}
func (client *StreamClient) Subscribe(ctx context.Context, namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
client.passedNamespace = namespace
client.passedPayloadChan = payloadChan
client.passedContext = ctx
for _, arg := range args {
client.passedSubscribeArgs = append(client.passedSubscribeArgs, arg)
}
subscription := rpc.ClientSubscription{}
return &subscription, nil
}

View File

@ -16,7 +16,11 @@
package utils package utils
import "errors" import (
"errors"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) { func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) {
if endingBlock < startingBlock { if endingBlock < startingBlock {
@ -43,3 +47,26 @@ func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint6
} }
return blockRangeBins, nil return blockRangeBins, nil
} }
func MissingHeightsToGaps(heights []uint64) []shared.Gap {
validationGaps := make([]shared.Gap, 0)
start := heights[0]
lastHeight := start
for i, height := range heights[1:] {
if height != lastHeight+1 {
validationGaps = append(validationGaps, shared.Gap{
Start: start,
Stop: lastHeight,
})
start = height
}
if i+2 == len(heights) {
validationGaps = append(validationGaps, shared.Gap{
Start: start,
Stop: height,
})
}
lastHeight = height
}
return validationGaps
}

View File

@ -62,7 +62,7 @@ type BackFillService struct {
// Channel for receiving quit signal // Channel for receiving quit signal
QuitChan chan bool QuitChan chan bool
// Chain type // Chain type
chain shared.ChainType Chain shared.ChainType
// Headers with times_validated lower than this will be resynced // Headers with times_validated lower than this will be resynced
validationLevel int validationLevel int
} }
@ -108,7 +108,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
BatchNumber: int64(batchNumber), BatchNumber: int64(batchNumber),
ScreenAndServeChan: screenAndServeChan, ScreenAndServeChan: screenAndServeChan,
QuitChan: settings.Quit, QuitChan: settings.Quit,
chain: settings.Chain, Chain: settings.Chain,
validationLevel: settings.ValidationLevel, validationLevel: settings.ValidationLevel,
}, nil }, nil
} }
@ -122,25 +122,25 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
for { for {
select { select {
case <-bfs.QuitChan: case <-bfs.QuitChan:
log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String()) log.Infof("quiting %s FillGapsInSuperNode process", bfs.Chain.String())
wg.Done() wg.Done()
return return
case <-ticker.C: case <-ticker.C:
log.Infof("searching for gaps in the %s super node database", bfs.chain.String()) log.Infof("searching for gaps in the %s super node database", bfs.Chain.String())
startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber() startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber()
if err != nil { if err != nil {
log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.chain.String(), err) log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err)
continue continue
} }
if startingBlock != 0 { if startingBlock != 0 && bfs.Chain == shared.Bitcoin || startingBlock != 1 && bfs.Chain == shared.Ethereum {
log.Infof("found gap at the beginning of the %s sync", bfs.chain.String()) log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String())
if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil { if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil {
log.Error(err) log.Error(err)
} }
} }
gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel) gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel)
if err != nil { if err != nil {
log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.chain.String(), err) log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.Chain.String(), err)
continue continue
} }
for _, gap := range gaps { for _, gap := range gaps {
@ -151,15 +151,15 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
} }
} }
}() }()
log.Infof("%s fillGaps goroutine successfully spun up", bfs.chain.String()) log.Infof("%s fillGaps goroutine successfully spun up", bfs.Chain.String())
} }
// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks // backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error {
log.Infof("filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) log.Infof("filling in %s gap from %d to %d", bfs.Chain.String(), startingBlock, endingBlock)
if endingBlock < startingBlock { if endingBlock < startingBlock {
return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.chain.String()) return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.Chain.String())
} }
// break the range up into bins of smaller ranges // break the range up into bins of smaller ranges
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize) blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize)
@ -184,12 +184,12 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error {
go func(blockHeights []uint64) { go func(blockHeights []uint64) {
payloads, err := bfs.Fetcher.FetchAt(blockHeights) payloads, err := bfs.Fetcher.FetchAt(blockHeights)
if err != nil { if err != nil {
log.Errorf("%s super node historical data fetcher error: %s", bfs.chain.String(), err.Error()) log.Errorf("%s super node historical data fetcher error: %s", bfs.Chain.String(), err.Error())
} }
for _, payload := range payloads { for _, payload := range payloads {
ipldPayload, err := bfs.Converter.Convert(payload) ipldPayload, err := bfs.Converter.Convert(payload)
if err != nil { if err != nil {
log.Errorf("%s super node historical data converter error: %s", bfs.chain.String(), err.Error()) log.Errorf("%s super node historical data converter error: %s", bfs.Chain.String(), err.Error())
} }
// If there is a ScreenAndServe process listening, forward payload to it // If there is a ScreenAndServe process listening, forward payload to it
select { select {
@ -198,14 +198,14 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error {
} }
cidPayload, err := bfs.Publisher.Publish(ipldPayload) cidPayload, err := bfs.Publisher.Publish(ipldPayload)
if err != nil { if err != nil {
log.Errorf("%s super node historical data publisher error: %s", bfs.chain.String(), err.Error()) log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error())
} }
if err := bfs.Indexer.Index(cidPayload); err != nil { if err := bfs.Indexer.Index(cidPayload); err != nil {
log.Errorf("%s super node historical data indexer error: %s", bfs.chain.String(), err.Error()) log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error())
} }
} }
// when this goroutine is done, send out a signal // when this goroutine is done, send out a signal
log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) log.Infof("finished filling in %s gap from %d to %d", bfs.Chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1])
processingDone <- true processingDone <- true
}(blockHeights) }(blockHeights)
} }

View File

@ -45,7 +45,7 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil, ReturnErr: nil,
} }
mockRetriever := &mocks2.CIDRetriever{ mockRetriever := &mocks2.CIDRetriever{
FirstBlockNumberToReturn: 0, FirstBlockNumberToReturn: 1,
GapsToRetrieve: []shared.Gap{ GapsToRetrieve: []shared.Gap{
{ {
Start: 100, Stop: 101, Start: 100, Stop: 101,
@ -69,6 +69,7 @@ var _ = Describe("BackFiller", func() {
BatchSize: super_node.DefaultMaxBatchSize, BatchSize: super_node.DefaultMaxBatchSize,
BatchNumber: super_node.DefaultMaxBatchNumber, BatchNumber: super_node.DefaultMaxBatchNumber,
QuitChan: quitChan, QuitChan: quitChan,
Chain: shared.Ethereum,
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
backfiller.FillGapsInSuperNode(wg) backfiller.FillGapsInSuperNode(wg)
@ -101,7 +102,7 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil, ReturnErr: nil,
} }
mockRetriever := &mocks2.CIDRetriever{ mockRetriever := &mocks2.CIDRetriever{
FirstBlockNumberToReturn: 0, FirstBlockNumberToReturn: 1,
GapsToRetrieve: []shared.Gap{ GapsToRetrieve: []shared.Gap{
{ {
Start: 100, Stop: 100, Start: 100, Stop: 100,
@ -124,6 +125,7 @@ var _ = Describe("BackFiller", func() {
BatchSize: super_node.DefaultMaxBatchSize, BatchSize: super_node.DefaultMaxBatchSize,
BatchNumber: super_node.DefaultMaxBatchNumber, BatchNumber: super_node.DefaultMaxBatchNumber,
QuitChan: quitChan, QuitChan: quitChan,
Chain: shared.Ethereum,
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
backfiller.FillGapsInSuperNode(wg) backfiller.FillGapsInSuperNode(wg)
@ -173,6 +175,7 @@ var _ = Describe("BackFiller", func() {
BatchSize: super_node.DefaultMaxBatchSize, BatchSize: super_node.DefaultMaxBatchSize,
BatchNumber: super_node.DefaultMaxBatchNumber, BatchNumber: super_node.DefaultMaxBatchNumber,
QuitChan: quitChan, QuitChan: quitChan,
Chain: shared.Ethereum,
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
backfiller.FillGapsInSuperNode(wg) backfiller.FillGapsInSuperNode(wg)

View File

@ -50,6 +50,9 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error {
SET times_validated = 0 SET times_validated = 0
WHERE block_number BETWEEN $1 AND $2` WHERE block_number BETWEEN $1 AND $2`
if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err return err
} }
} }

View File

@ -20,6 +20,8 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -194,22 +196,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
if len(heights) == 0 { if len(heights) == 0 {
return emptyGaps, nil return emptyGaps, nil
} }
validationGaps := make([]shared.Gap, 0) return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil
start := heights[0]
lastHeight := start
for _, height := range heights[1:] {
if height == lastHeight+1 {
lastHeight = height
continue
}
validationGaps = append(validationGaps, shared.Gap{
Start: start,
Stop: lastHeight,
})
start = height
lastHeight = start
}
return append(emptyGaps, validationGaps...), nil
} }
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash

View File

@ -91,13 +91,6 @@ func NewSuperNodeConfig() (*Config, error) {
viper.BindEnv("superNode.ipcPath", SUPERNODE_IPC_PATH) viper.BindEnv("superNode.ipcPath", SUPERNODE_IPC_PATH)
viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH) viper.BindEnv("superNode.httpPath", SUPERNODE_HTTP_PATH)
viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL) viper.BindEnv("superNode.backFill", SUPERNODE_BACKFILL)
viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT)
timeout := viper.GetInt("superNode.timeout")
if timeout < 15 {
timeout = 15
}
c.Timeout = time.Second * time.Duration(timeout)
chain := viper.GetString("superNode.chain") chain := viper.GetString("superNode.chain")
c.Chain, err = shared.NewChainType(chain) c.Chain, err = shared.NewChainType(chain)
@ -178,6 +171,13 @@ func (c *Config) BackFillFields() error {
viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE) viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE)
viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER) viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER)
viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL) viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL)
viper.BindEnv("superNode.timeout", shared.HTTP_TIMEOUT)
timeout := viper.GetInt("superNode.timeout")
if timeout < 15 {
timeout = 15
}
c.Timeout = time.Second * time.Duration(timeout)
switch c.Chain { switch c.Chain {
case shared.Ethereum: case shared.Ethereum:

View File

@ -21,12 +21,10 @@ import (
"time" "time"
"github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/rpcclient"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc" "github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
@ -73,10 +71,9 @@ func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriev
func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) { func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) {
switch chain { switch chain {
case shared.Ethereum: case shared.Ethereum:
ethClient, ok := clientOrConfig.(core.RPCClient) ethClient, ok := clientOrConfig.(*rpc.Client)
if !ok { if !ok {
var expectedClientType core.RPCClient return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", &rpc.Client{}, clientOrConfig)
return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", expectedClientType, clientOrConfig)
} }
streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize) streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize)
return eth.NewPayloadStreamer(ethClient), streamChan, nil return eth.NewPayloadStreamer(ethClient), streamChan, nil
@ -96,10 +93,9 @@ func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (sha
func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error) { func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error) {
switch chain { switch chain {
case shared.Ethereum: case shared.Ethereum:
batchClient, ok := client.(eth.BatchClient) batchClient, ok := client.(*rpc.Client)
if !ok { if !ok {
var expectedClient eth.BatchClient return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", &rpc.Client{}, client)
return nil, fmt.Errorf("ethereum payload fetcher constructor expected client type %T got %T", expectedClient, client)
} }
return eth.NewPayloadFetcher(batchClient, timeout), nil return eth.NewPayloadFetcher(batchClient, timeout), nil
case shared.Bitcoin: case shared.Bitcoin:

View File

@ -50,6 +50,9 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error {
SET times_validated = 0 SET times_validated = 0
WHERE block_number BETWEEN $1 AND $2` WHERE block_number BETWEEN $1 AND $2`
if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err return err
} }
} }

View File

@ -21,15 +21,15 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
// BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion // BatchClient is an interface to a batch-fetching geth rpc client; created to allow mock insertion
type BatchClient interface { type BatchClient interface {
BatchCallContext(ctx context.Context, batch []client.BatchElem) error BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error
} }
// PayloadFetcher satisfies the PayloadFetcher interface for ethereum // PayloadFetcher satisfies the PayloadFetcher interface for ethereum
@ -53,9 +53,9 @@ func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher {
// FetchAt fetches the statediff payloads at the given block heights // FetchAt fetches the statediff payloads at the given block heights
// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) // Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
batch := make([]client.BatchElem, 0) batch := make([]rpc.BatchElem, 0)
for _, height := range blockHeights { for _, height := range blockHeights {
batch = append(batch, client.BatchElem{ batch = append(batch, rpc.BatchElem{
Method: method, Method: method,
Args: []interface{}{height}, Args: []interface{}{height},
Result: new(statediff.Payload), Result: new(statediff.Payload),

View File

@ -20,6 +20,8 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
@ -479,22 +481,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
if len(heights) == 0 { if len(heights) == 0 {
return emptyGaps, nil return emptyGaps, nil
} }
validationGaps := make([]shared.Gap, 0) return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil
start := heights[0]
lastHeight := start
for _, height := range heights[1:] {
if height == lastHeight+1 {
lastHeight = height
continue
}
validationGaps = append(validationGaps, shared.Gap{
Start: start,
Stop: lastHeight,
})
start = height
lastHeight = start
}
return append(emptyGaps, validationGaps...), nil
} }
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash

View File

@ -544,8 +544,16 @@ var _ = Describe("Retriever", func() {
payload4.HeaderCID.BlockNumber = "101" payload4.HeaderCID.BlockNumber = "101"
payload5 := payload4 payload5 := payload4
payload5.HeaderCID.BlockNumber = "102" payload5.HeaderCID.BlockNumber = "102"
payload6 := payload5 payload6 := payload4
payload6.HeaderCID.BlockNumber = "1000" payload6.HeaderCID.BlockNumber = "103"
payload7 := payload4
payload7.HeaderCID.BlockNumber = "104"
payload8 := payload4
payload8.HeaderCID.BlockNumber = "105"
payload9 := payload4
payload9.HeaderCID.BlockNumber = "106"
payload10 := payload5
payload10.HeaderCID.BlockNumber = "1000"
err := repo.Index(&payload1) err := repo.Index(&payload1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2) err = repo.Index(&payload2)
@ -558,11 +566,76 @@ var _ = Describe("Retriever", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload6) err = repo.Index(&payload6)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload7)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload8)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload9)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload10)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1) gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(3)) Expect(len(gaps)).To(Equal(3))
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 103, Stop: 999})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
})
It("Finds validation level gaps", func() {
payload1 := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "1010101"
payload2 := payload1
payload2.HeaderCID.BlockNumber = "5"
payload3 := payload2
payload3.HeaderCID.BlockNumber = "100"
payload4 := payload3
payload4.HeaderCID.BlockNumber = "101"
payload5 := payload4
payload5.HeaderCID.BlockNumber = "102"
payload6 := payload4
payload6.HeaderCID.BlockNumber = "103"
payload7 := payload4
payload7.HeaderCID.BlockNumber = "104"
payload8 := payload4
payload8.HeaderCID.BlockNumber = "105"
payload9 := payload4
payload9.HeaderCID.BlockNumber = "106"
payload10 := payload5
payload10.HeaderCID.BlockNumber = "1000"
err := repo.Index(&payload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload3)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload4)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload5)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload6)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload7)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload8)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload9)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload10)
Expect(err).ToNot(HaveOccurred())
cleaner := eth.NewCleaner(db)
err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}})
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(5))
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
}) })
}) })

View File

@ -17,10 +17,12 @@
package eth package eth
import ( import (
"context"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
@ -28,13 +30,18 @@ const (
PayloadChanBufferSize = 20000 // the max eth sub buffer size PayloadChanBufferSize = 20000 // the max eth sub buffer size
) )
// StreamClient is an interface for subscribing and streaming from geth
type StreamClient interface {
Subscribe(ctx context.Context, namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
}
// PayloadStreamer satisfies the PayloadStreamer interface for ethereum // PayloadStreamer satisfies the PayloadStreamer interface for ethereum
type PayloadStreamer struct { type PayloadStreamer struct {
Client core.RPCClient Client StreamClient
} }
// NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum // NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum
func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer { func NewPayloadStreamer(client StreamClient) *PayloadStreamer {
return &PayloadStreamer{ return &PayloadStreamer{
Client: client, Client: client,
} }
@ -53,5 +60,5 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.
} }
} }
}() }()
return ps.Client.Subscribe("statediff", stateDiffChan, "stream") return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream")
} }

View File

@ -18,14 +18,14 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/eth/fakes" "github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
var _ = Describe("StateDiff Streamer", func() { var _ = Describe("StateDiff Streamer", func() {
It("subscribes to the geth statediff service", func() { It("subscribes to the geth statediff service", func() {
client := &fakes.MockRPCClient{} client := &mocks.StreamClient{}
streamer := eth.NewPayloadStreamer(client) streamer := eth.NewPayloadStreamer(client)
payloadChan := make(chan shared.RawChainData) payloadChan := make(chan shared.RawChainData)
_, err := streamer.Stream(payloadChan) _, err := streamer.Stream(payloadChan)