From 4036d9d6a03276ccc0862963adc0bb6b052d0cf8 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 4 Nov 2019 13:14:05 -0600 Subject: [PATCH] review fixes --- cmd/root.go | 4 +-- cmd/streamSubscribe.go | 4 +-- cmd/syncAndPublish.go | 6 ++-- db/schema.sql | 2 +- documentation/custom-transformers.md | 4 +++ documentation/super-node.md | 6 ++-- go.mod | 5 --- .../shared/streamer/super_node_streamer.go | 4 +-- ...ansformer.go => super_node_transformer.go} | 2 +- pkg/super_node/backfiller.go | 36 ++++++------------- pkg/super_node/service.go | 4 +-- 11 files changed, 31 insertions(+), 46 deletions(-) rename libraries/shared/transformer/{seed_node_transformer.go => super_node_transformer.go} (94%) diff --git a/cmd/root.go b/cmd/root.go index 07e3ecf9..17922b30 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -181,11 +181,11 @@ func getClients() (client.RPCClient, *ethclient.Client) { func getWSClient() core.RPCClient { wsRPCpath := viper.GetString("client.wsPath") if wsRPCpath == "" { - LogWithCommand.Fatal(errors.New("getWSClient() was called but no ws rpc path is provided")) + logWithCommand.Fatal(errors.New("getWSClient() was called but no ws rpc path is provided")) } wsRPCClient, dialErr := rpc.Dial(wsRPCpath) if dialErr != nil { - LogWithCommand.Fatal(dialErr) + logWithCommand.Fatal(dialErr) } return client.NewRPCClient(wsRPCClient, wsRPCpath) } diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index c6454072..e6f97e89 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -216,7 +216,7 @@ func configureSubscription() { } } -func getRPCClient() core.RpcClient { +func getRPCClient() core.RPCClient { vulcPath := viper.GetString("subscription.path") if vulcPath == "" { vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided @@ -225,5 +225,5 @@ func getRPCClient() core.RpcClient { if err != nil { logWithCommand.Fatal(err) } - return client.NewRpcClient(rawRPCClient, vulcPath) + return client.NewRPCClient(rawRPCClient, vulcPath) } diff --git a/cmd/syncAndPublish.go b/cmd/syncAndPublish.go index e2126f96..08519d12 100644 --- a/cmd/syncAndPublish.go +++ b/cmd/syncAndPublish.go @@ -77,16 +77,16 @@ func syncAndPublish() { wg.Wait() // If an error was thrown, wg.Add was never called and this will fall through } -func getBlockChainAndClient(path string) (*eth.BlockChain, core.RpcClient) { +func getBlockChainAndClient(path string) (*eth.BlockChain, core.RPCClient) { rawRPCClient, dialErr := rpc.Dial(path) if dialErr != nil { logWithCommand.Fatal(dialErr) } - rpcClient := client.NewRpcClient(rawRPCClient, ipc) + rpcClient := client.NewRPCClient(rawRPCClient, ipc) ethClient := ethclient.NewClient(rawRPCClient) vdbEthClient := client.NewEthClient(ethClient) vdbNode := node.MakeNode(rpcClient) - transactionConverter := vRpc.NewRpcTransactionConverter(ethClient) + transactionConverter := vRpc.NewRPCTransactionConverter(ethClient) blockChain := eth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter) return blockChain, rpcClient } diff --git a/db/schema.sql b/db/schema.sql index 66569a53..763b5373 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -2,7 +2,7 @@ -- PostgreSQL database dump -- --- Dumped from database version 10.10 +-- Dumped from database version 11.5 -- Dumped by pg_dump version 11.5 SET statement_timeout = 0; diff --git a/documentation/custom-transformers.md b/documentation/custom-transformers.md index d45afcdf..33894856 100644 --- a/documentation/custom-transformers.md +++ b/documentation/custom-transformers.md @@ -100,7 +100,11 @@ The config provides information for composing a set of transformers from externa [client] ipcPath = "/Users/user/Library/Ethereum/geth.ipc" +<<<<<<< HEAD wsPath = "ws://127.0.0.1:8546" +======= + wsPath = "http://127.0.0.1:" +>>>>>>> review fixes [exporter] home = "github.com/vulcanize/vulcanizedb" diff --git a/documentation/super-node.md b/documentation/super-node.md index cf53d9f3..d33c595b 100644 --- a/documentation/super-node.md +++ b/documentation/super-node.md @@ -150,7 +150,7 @@ The config file for the `syncPublishScreenAndServe` command has two additional f ipcPath = "/Users/user/.vulcanize/vulcanize.ipc" wsEndpoint = "127.0.0.1:80" -[backfill] +[superNodeBackFill] on = false ipcPath = "" frequency = 5 @@ -163,9 +163,9 @@ be known by transformers for them to subscribe to the super node. Because the super node syncs data from a geth full node as it progresses through its block synchronization, there is potential for the super node to miss data both at the beginning of the sync due to lag between initialization of the two processes and -anywhere throughout the sync if the processes are interrupted. The `backfill` config mapping is used to optionally configure +anywhere throughout the sync if the processes are interrupted. The `superNodeBackFill` config mapping is used to optionally configure the super node with an archival geth client that exposes a `statediff.StateDiffAt` rpc endpoint, to enable it to fill in these data gaps. -`backfill.on` turns the backfill process on, the `backfill.ipcPath` is the rpc path for the archival geth node, and `backfill.frequency` +`superNodeBackFill.on` turns the backfill process on, the `superNodeBackFill.ipcPath` is the rpc path for the archival geth node, and `superNodeBackFill.frequency` sets at what frequency (in minutes) the backfill process checks for and fills in gaps. diff --git a/go.mod b/go.mod index 02604f3d..3a5bb0ac 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,6 @@ require ( github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 github.com/fsnotify/fsnotify v1.4.7 github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff - github.com/go-logfmt/logfmt v0.4.0 // indirect github.com/go-stack/stack v1.8.0 github.com/gogo/protobuf v1.2.1 github.com/golang/protobuf v1.3.2 @@ -117,7 +116,6 @@ require ( github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-peerstore v0.1.3 github.com/libp2p/go-libp2p-pnet v0.1.0 - github.com/libp2p/go-libp2p-protocol v0.1.0 // indirect github.com/libp2p/go-libp2p-pubsub v0.1.0 github.com/libp2p/go-libp2p-pubsub-router v0.1.0 github.com/libp2p/go-libp2p-quic-transport v0.1.1 @@ -135,10 +133,8 @@ require ( github.com/libp2p/go-nat v0.0.3 github.com/libp2p/go-reuseport v0.0.1 github.com/libp2p/go-reuseport-transport v0.0.2 - github.com/libp2p/go-stream-muxer v0.1.0 // indirect github.com/libp2p/go-stream-muxer-multistream v0.2.0 github.com/libp2p/go-tcp-transport v0.1.0 - github.com/libp2p/go-testutil v0.1.0 // indirect github.com/libp2p/go-ws-transport v0.1.0 github.com/libp2p/go-yamux v1.2.3 github.com/lucas-clemente/quic-go v0.11.2 @@ -214,7 +210,6 @@ require ( golang.org/x/text v0.3.2 golang.org/x/tools v0.0.0-20190802003818-e9bb7d36c060 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 - google.golang.org/appengine v1.4.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 diff --git a/libraries/shared/streamer/super_node_streamer.go b/libraries/shared/streamer/super_node_streamer.go index 2a61675b..f82921f2 100644 --- a/libraries/shared/streamer/super_node_streamer.go +++ b/libraries/shared/streamer/super_node_streamer.go @@ -35,11 +35,11 @@ type ISuperNodeStreamer interface { // SuperNodeStreamer is the underlying struct for the ISuperNodeStreamer interface type SuperNodeStreamer struct { - Client core.RpcClient + Client core.RPCClient } // NewSuperNodeStreamer creates a pointer to a new SuperNodeStreamer which satisfies the ISuperNodeStreamer interface -func NewSuperNodeStreamer(client core.RpcClient) *SuperNodeStreamer { +func NewSuperNodeStreamer(client core.RPCClient) *SuperNodeStreamer { return &SuperNodeStreamer{ Client: client, } diff --git a/libraries/shared/transformer/seed_node_transformer.go b/libraries/shared/transformer/super_node_transformer.go similarity index 94% rename from libraries/shared/transformer/seed_node_transformer.go rename to libraries/shared/transformer/super_node_transformer.go index 0564b60d..9e28cf9c 100644 --- a/libraries/shared/transformer/seed_node_transformer.go +++ b/libraries/shared/transformer/super_node_transformer.go @@ -28,4 +28,4 @@ type SuperNodeTransformer interface { GetConfig() config.Subscription } -type SuperNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RpcClient) SuperNodeTransformer +type SuperNodeTransformerInitializer func(db *postgres.DB, subCon config.Subscription, client core.RPCClient) SuperNodeTransformer diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index cd016477..55e10d41 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -26,13 +26,14 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) const ( - DefaultMaxBatchSize uint64 = 1000 + DefaultMaxBatchSize uint64 = 100 defaultMaxBatchNumber int64 = 10 ) @@ -61,7 +62,7 @@ type BackFillService struct { } // NewBackFillService returns a new BackFillInterface -func NewBackFillService(ipfsPath string, db *postgres.DB, archivalNodeRPCClient core.RpcClient, freq time.Duration, batchSize uint64) (BackFillInterface, error) { +func NewBackFillService(ipfsPath string, db *postgres.DB, archivalNodeRPCClient core.RPCClient, freq time.Duration, batchSize uint64) (BackFillInterface, error) { publisher, err := ipfs.NewIPLDPublisher(ipfsPath) if err != nil { return nil, err @@ -120,7 +121,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) { log.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock) errChan := make(chan error) done := make(chan bool) - backFillInitErr := bfs.BackFill(startingBlock, endingBlock, errChan, done) + backFillInitErr := bfs.backFill(startingBlock, endingBlock, errChan, done) if backFillInitErr != nil { log.Error(backFillInitErr) return @@ -136,33 +137,18 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) { } } -// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks +// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently -func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error { +func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error { if endingBlock < startingBlock { return errors.New("backfill: ending block number needs to be greater than starting block number") } + // // break the range up into bins of smaller ranges - length := endingBlock - startingBlock + 1 - numberOfBins := length / bfs.BatchSize - remainder := length % bfs.BatchSize - if remainder != 0 { - numberOfBins++ + blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize) + if err != nil { + return err } - blockRangeBins := make([][]uint64, numberOfBins) - for i := range blockRangeBins { - nextBinStart := startingBlock + uint64(bfs.BatchSize) - if nextBinStart > endingBlock { - nextBinStart = endingBlock + 1 - } - blockRange := make([]uint64, 0, nextBinStart-startingBlock+1) - for j := startingBlock; j < nextBinStart; j++ { - blockRange = append(blockRange, j) - } - startingBlock = nextBinStart - blockRangeBins[i] = blockRange - } - // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have var activeCount int64 // channel for processing goroutines to signal when they are done @@ -221,7 +207,7 @@ func (bfs *BackFillService) BackFill(startingBlock, endingBlock uint64, errChan } log.Infof("finished filling in gap sub-bin from %d to %d", doneWithHeights[0], doneWithHeights[1]) goroutinesFinished++ - if goroutinesFinished == int(numberOfBins) { + if goroutinesFinished >= len(blockRangeBins) { done <- true return } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 8aada6ee..41de906c 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -63,7 +63,7 @@ type Service struct { // Used to sync access to the Subscriptions sync.Mutex // Interface for streaming statediff payloads over a geth rpc subscription - Streamer streamer.IStateDiffStreamer + Streamer streamer.Streamer // Interface for converting statediff payloads into ETH-IPLD object payloads Converter ipfs.PayloadConverter // Interface for publishing the ETH-IPLD payloads to IPFS @@ -93,7 +93,7 @@ type Service struct { } // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct -func NewSuperNode(ipfsPath string, db *postgres.DB, rpcClient core.RpcClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) { +func NewSuperNode(ipfsPath string, db *postgres.DB, rpcClient core.RPCClient, qc chan bool, workers int, node core.Node) (NodeInterface, error) { ipfsInitErr := ipfs.InitIPFSPlugins() if ipfsInitErr != nil { return nil, ipfsInitErr