review fixes

This commit is contained in:
Ian Norden 2019-11-04 13:14:05 -06:00
parent 5be205ffa6
commit 4036d9d6a0
11 changed files with 31 additions and 46 deletions

View File

@ -181,11 +181,11 @@ func getClients() (client.RPCClient, *ethclient.Client) {
func getWSClient() core.RPCClient {
wsRPCpath := viper.GetString("client.wsPath")
if wsRPCpath == "" {
LogWithCommand.Fatal(errors.New("getWSClient() was called but no ws rpc path is provided"))
logWithCommand.Fatal(errors.New("getWSClient() was called but no ws rpc path is provided"))
}
wsRPCClient, dialErr := rpc.Dial(wsRPCpath)
if dialErr != nil {
LogWithCommand.Fatal(dialErr)
logWithCommand.Fatal(dialErr)
}
return client.NewRPCClient(wsRPCClient, wsRPCpath)
}

View File

@ -216,7 +216,7 @@ func configureSubscription() {
}
}
func getRPCClient() core.RpcClient {
func getRPCClient() core.RPCClient {
vulcPath := viper.GetString("subscription.path")
if vulcPath == "" {
vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided
@ -225,5 +225,5 @@ func getRPCClient() core.RpcClient {
if err != nil {
logWithCommand.Fatal(err)
}
return client.NewRpcClient(rawRPCClient, vulcPath)
return client.NewRPCClient(rawRPCClient, vulcPath)
}

View File

@ -77,16 +77,16 @@ func syncAndPublish() {
wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through
}
func getBlockChainAndClient(path string) (*eth.BlockChain, core.RpcClient) {
func getBlockChainAndClient(path string) (*eth.BlockChain, core.RPCClient) {
rawRPCClient, dialErr := rpc.Dial(path)
if dialErr != nil {
logWithCommand.Fatal(dialErr)
}
rpcClient := client.NewRpcClient(rawRPCClient, ipc)
rpcClient := client.NewRPCClient(rawRPCClient, ipc)
ethClient := ethclient.NewClient(rawRPCClient)
vdbEthClient := client.NewEthClient(ethClient)
vdbNode := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
transactionConverter := vRpc.NewRPCTransactionConverter(ethClient)
blockChain := eth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)
return blockChain, rpcClient
}

View File

@ -2,7 +2,7 @@
-- PostgreSQL database dump
--
-- Dumped from database version 10.10
-- Dumped from database version 11.5
-- Dumped by pg_dump version 11.5
SET statement_timeout = 0;

View File

@ -100,7 +100,11 @@ The config provides information for composing a set of transformers from externa
[client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
<<<<<<< HEAD
wsPath = "ws://127.0.0.1:8546"
=======
wsPath = "http://127.0.0.1:"
>>>>>>> review fixes
[exporter]
home = "github.com/vulcanize/vulcanizedb"

View File

@ -150,7 +150,7 @@ The config file for the `syncPublishScreenAndServe` command has two additional f
ipcPath = "/Users/user/.vulcanize/vulcanize.ipc"
wsEndpoint = "127.0.0.1:80"
[backfill]
[superNodeBackFill]
on = false
ipcPath = ""
frequency = 5
@ -163,9 +163,9 @@ be known by transformers for them to subscribe to the super node.
Because the super node syncs data from a geth full node as it progresses through its block synchronization, there is potential
for the super node to miss data both at the beginning of the sync due to lag between initialization of the two processes and
anywhere throughout the sync if the processes are interrupted. The `backfill` config mapping is used to optionally configure
anywhere throughout the sync if the processes are interrupted. The `superNodeBackFill` config mapping is used to optionally configure
the super node with an archival geth client that exposes a `statediff.StateDiffAt` rpc endpoint, to enable it to fill in these data gaps.
`backfill.on` turns the backfill process on, the `backfill.ipcPath` is the rpc path for the archival geth node, and `backfill.frequency`
`superNodeBackFill.on` turns the backfill process on, the `superNodeBackFill.ipcPath` is the rpc path for the archival geth node, and `superNodeBackFill.frequency`
sets at what frequency (in minutes) the backfill process checks for and fills in gaps.

5
go.mod
View File

@ -30,7 +30,6 @@ require (
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5
github.com/fsnotify/fsnotify v1.4.7
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff
github.com/go-logfmt/logfmt v0.4.0 // indirect
github.com/go-stack/stack v1.8.0
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.2
@ -117,7 +116,6 @@ require (
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.1.3
github.com/libp2p/go-libp2p-pnet v0.1.0
github.com/libp2p/go-libp2p-protocol v0.1.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.1.0
github.com/libp2p/go-libp2p-pubsub-router v0.1.0
github.com/libp2p/go-libp2p-quic-transport v0.1.1
@ -135,10 +133,8 @@ require (
github.com/libp2p/go-nat v0.0.3
github.com/libp2p/go-reuseport v0.0.1
github.com/libp2p/go-reuseport-transport v0.0.2
github.com/libp2p/go-stream-muxer v0.1.0 // indirect
github.com/libp2p/go-stream-muxer-multistream v0.2.0
github.com/libp2p/go-tcp-transport v0.1.0
github.com/libp2p/go-testutil v0.1.0 // indirect
github.com/libp2p/go-ws-transport v0.1.0
github.com/libp2p/go-yamux v1.2.3
github.com/lucas-clemente/quic-go v0.11.2
@ -214,7 +210,6 @@ require (
golang.org/x/text v0.3.2
golang.org/x/tools v0.0.0-20190802003818-e9bb7d36c060
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7
google.golang.org/appengine v1.4.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7

View File

@ -35,11 +35,11 @@ type ISuperNodeStreamer interface {
// SuperNodeStreamer is the underlying struct for the ISuperNodeStreamer interface
type SuperNodeStreamer struct {
Client core.RpcClient
Client core.RPCClient
}
// NewSuperNodeStreamer creates a pointer to a new SuperNodeStreamer which satisfies the ISuperNodeStreamer interface
func NewSuperNodeStreamer(client core.RpcClient) *SuperNodeStreamer {
func NewSuperNodeStreamer(client core.RPCClient) *SuperNodeStreamer {
return &SuperNodeStreamer{
Client: client,
}

View File

@ -28,4 +28,4 @@ type SuperNodeTransformer interface {
GetConfig() config.Subscription
}
type SuperNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RpcClient) SuperNodeTransformer
type SuperNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RPCClient) SuperNodeTransformer

View File

@ -26,13 +26,14 @@ import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
)
const (
DefaultMaxBatchSize uint64 = 1000
DefaultMaxBatchSize uint64 = 100
defaultMaxBatchNumber int64 = 10
)
@ -61,7 +62,7 @@ type BackFillService struct {
}
// NewBackFillService returns a new BackFillInterface
func NewBackFillService(ipfsPath string, db *postgres.DB, archivalNodeRPCClient core.RpcClient, freq time.Duration, batchSize uint64) (BackFillInterface, error) {
func NewBackFillService(ipfsPath string, db *postgres.DB, archivalNodeRPCClient core.RPCClient, freq time.Duration, batchSize uint64) (BackFillInterface, error) {
publisher, err := ipfs.NewIPLDPublisher(ipfsPath)
if err != nil {
return nil, err
@ -120,7 +121,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) {
log.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock)
errChan := make(chan error)
done := make(chan bool)
backFillInitErr := bfs.BackFill(startingBlock, endingBlock, errChan, done)
backFillInitErr := bfs.backFill(startingBlock, endingBlock, errChan, done)
if backFillInitErr != nil {
log.Error(backFillInitErr)
return
@ -136,33 +137,18 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) {
}
}
// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error {
func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error {
if endingBlock < startingBlock {
return errors.New("backfill: ending block number needs to be greater than starting block number")
}
//
// break the range up into bins of smaller ranges
length := endingBlock - startingBlock + 1
numberOfBins := length / bfs.BatchSize
remainder := length % bfs.BatchSize
if remainder != 0 {
numberOfBins++
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize)
if err != nil {
return err
}
blockRangeBins := make([][]uint64, numberOfBins)
for i := range blockRangeBins {
nextBinStart := startingBlock + uint64(bfs.BatchSize)
if nextBinStart > endingBlock {
nextBinStart = endingBlock + 1
}
blockRange := make([]uint64, 0, nextBinStart-startingBlock+1)
for j := startingBlock; j < nextBinStart; j++ {
blockRange = append(blockRange, j)
}
startingBlock = nextBinStart
blockRangeBins[i] = blockRange
}
// int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
var activeCount int64
// channel for processing goroutines to signal when they are done
@ -221,7 +207,7 @@ func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan
}
log.Infof("finished filling in gap sub-bin from %d to %d", doneWithHeights[0], doneWithHeights[1])
goroutinesFinished++
if goroutinesFinished == int(numberOfBins) {
if goroutinesFinished >= len(blockRangeBins) {
done <- true
return
}

View File

@ -63,7 +63,7 @@ type Service struct {
// Used to sync access to the Subscriptions
sync.Mutex
// Interface for streaming statediff payloads over a geth rpc subscription
Streamer streamer.IStateDiffStreamer
Streamer streamer.Streamer
// Interface for converting statediff payloads into ETH-IPLD object payloads
Converter ipfs.PayloadConverter
// Interface for publishing the ETH-IPLD payloads to IPFS
@ -93,7 +93,7 @@ type Service struct {
}
// NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct
func NewSuperNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) {
func NewSuperNode(ipfsPath string, db *postgres.DB, rpcClient core.RPCClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) {
ipfsInitErr := ipfs.InitIPFSPlugins()
if ipfsInitErr != nil {
return nil, ipfsInitErr