updat dep and serve cmd
This commit is contained in:
parent
9758770579
commit
dbcb2c0cf7
85
cmd/serve.go
85
cmd/serve.go
@ -18,22 +18,22 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
s "sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/serve"
|
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
|
||||||
v "github.com/vulcanize/ipld-eth-server/version"
|
v "github.com/vulcanize/ipld-eth-server/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
// watchCmd represents the watch command
|
// serveCmd represents the serve command
|
||||||
var watchCmd = &cobra.Command{
|
var serveCmd = &cobra.Command{
|
||||||
Use: "watch",
|
Use: "serve",
|
||||||
Short: "serve chain data from PG-IPFS",
|
Short: "serve chain data from PG-IPFS",
|
||||||
Long: `This command configures a VulcanizeDB ipld-eth-server.
|
Long: `This command configures a VulcanizeDB ipld-eth-server.
|
||||||
|
|
||||||
@ -41,82 +41,81 @@ var watchCmd = &cobra.Command{
|
|||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
subCommand = cmd.CalledAs()
|
subCommand = cmd.CalledAs()
|
||||||
logWithCommand = *log.WithField("SubCommand", subCommand)
|
logWithCommand = *log.WithField("SubCommand", subCommand)
|
||||||
watch()
|
serve()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func watch() {
|
func serve() {
|
||||||
logWithCommand.Infof("running ipld-eth-server version: %s", v.VersionWithMeta)
|
logWithCommand.Infof("running ipld-eth-server version: %s", v.VersionWithMeta)
|
||||||
|
|
||||||
var forwardPayloadChan chan eth.ConvertedPayload
|
var forwardPayloadChan chan eth.ConvertedPayload
|
||||||
wg := new(s.WaitGroup)
|
wg := new(sync.WaitGroup)
|
||||||
logWithCommand.Debug("loading watcher configuration variables")
|
logWithCommand.Debug("loading server configuration variables")
|
||||||
watcherConfig, err := serve.NewConfig()
|
serverConfig, err := s.NewConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
logWithCommand.Infof("watcher config: %+v", watcherConfig)
|
logWithCommand.Infof("server config: %+v", serverConfig)
|
||||||
logWithCommand.Debug("initializing new watcher service")
|
logWithCommand.Debug("initializing new server service")
|
||||||
s, err := serve.NewServer(watcherConfig)
|
server, err := s.NewServer(serverConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logWithCommand.Info("starting up watcher servers")
|
logWithCommand.Info("starting up server servers")
|
||||||
forwardPayloadChan = make(chan eth.ConvertedPayload, serve.PayloadChanBufferSize)
|
forwardPayloadChan = make(chan eth.ConvertedPayload, s.PayloadChanBufferSize)
|
||||||
s.Serve(wg, forwardPayloadChan)
|
server.Serve(wg, forwardPayloadChan)
|
||||||
if err := startServers(s, watcherConfig); err != nil {
|
if err := startServers(server, serverConfig); err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
shutdown := make(chan os.Signal)
|
shutdown := make(chan os.Signal)
|
||||||
signal.Notify(shutdown, os.Interrupt)
|
signal.Notify(shutdown, os.Interrupt)
|
||||||
<-shutdown
|
<-shutdown
|
||||||
s.Stop()
|
server.Stop()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func startServers(watcher serve.Server, settings *serve.Config) error {
|
func startServers(server s.Server, settings *s.Config) error {
|
||||||
logWithCommand.Debug("starting up IPC server")
|
logWithCommand.Debug("starting up IPC server")
|
||||||
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, watcher.APIs())
|
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logWithCommand.Debug("starting up WS server")
|
logWithCommand.Debug("starting up WS server")
|
||||||
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, watcher.APIs(), []string{"vdb"}, nil, true)
|
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
logWithCommand.Debug("starting up HTTP server")
|
logWithCommand.Debug("starting up HTTP server")
|
||||||
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, watcher.APIs(), []string{"eth"}, nil, nil, rpc.HTTPTimeouts{})
|
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, nil, rpc.HTTPTimeouts{})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
rootCmd.AddCommand(watchCmd)
|
rootCmd.AddCommand(serveCmd)
|
||||||
|
|
||||||
// flags for all config variables
|
// flags for all config variables
|
||||||
watchCmd.PersistentFlags().String("watcher-ws-path", "", "vdb server ws path")
|
serveCmd.PersistentFlags().String("server-ws-path", "", "vdb server ws path")
|
||||||
watchCmd.PersistentFlags().String("watcher-http-path", "", "vdb server http path")
|
serveCmd.PersistentFlags().String("server-http-path", "", "vdb server http path")
|
||||||
watchCmd.PersistentFlags().String("watcher-ipc-path", "", "vdb server ipc path")
|
serveCmd.PersistentFlags().String("server-ipc-path", "", "vdb server ipc path")
|
||||||
|
|
||||||
watchCmd.PersistentFlags().String("eth-ws-path", "", "ws url for ethereum node")
|
serveCmd.PersistentFlags().String("eth-ws-path", "", "ws url for ethereum node")
|
||||||
watchCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node")
|
serveCmd.PersistentFlags().String("eth-http-path", "", "http url for ethereum node")
|
||||||
watchCmd.PersistentFlags().String("eth-node-id", "", "eth node id")
|
serveCmd.PersistentFlags().String("eth-node-id", "", "eth node id")
|
||||||
watchCmd.PersistentFlags().String("eth-client-name", "", "eth client name")
|
serveCmd.PersistentFlags().String("eth-client-name", "", "eth client name")
|
||||||
watchCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash")
|
serveCmd.PersistentFlags().String("eth-genesis-block", "", "eth genesis block hash")
|
||||||
watchCmd.PersistentFlags().String("eth-network-id", "", "eth network id")
|
serveCmd.PersistentFlags().String("eth-network-id", "", "eth network id")
|
||||||
|
|
||||||
// and their bindings
|
// and their bindings
|
||||||
viper.BindPFlag("watcher.wsPath", watchCmd.PersistentFlags().Lookup("watcher-ws-path"))
|
viper.BindPFlag("server.wsPath", serveCmd.PersistentFlags().Lookup("server-ws-path"))
|
||||||
viper.BindPFlag("watcher.httpPath", watchCmd.PersistentFlags().Lookup("watcher-http-path"))
|
viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("server-http-path"))
|
||||||
viper.BindPFlag("watcher.ipcPath", watchCmd.PersistentFlags().Lookup("watcher-ipc-path"))
|
viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("server-ipc-path"))
|
||||||
|
|
||||||
viper.BindPFlag("ethereum.wsPath", watchCmd.PersistentFlags().Lookup("eth-ws-path"))
|
viper.BindPFlag("ethereum.wsPath", serveCmd.PersistentFlags().Lookup("eth-ws-path"))
|
||||||
viper.BindPFlag("ethereum.httpPath", watchCmd.PersistentFlags().Lookup("eth-http-path"))
|
viper.BindPFlag("ethereum.httpPath", serveCmd.PersistentFlags().Lookup("eth-http-path"))
|
||||||
viper.BindPFlag("ethereum.nodeID", watchCmd.PersistentFlags().Lookup("eth-node-id"))
|
viper.BindPFlag("ethereum.nodeID", serveCmd.PersistentFlags().Lookup("eth-node-id"))
|
||||||
viper.BindPFlag("ethereum.clientName", watchCmd.PersistentFlags().Lookup("eth-client-name"))
|
viper.BindPFlag("ethereum.clientName", serveCmd.PersistentFlags().Lookup("eth-client-name"))
|
||||||
viper.BindPFlag("ethereum.genesisBlock", watchCmd.PersistentFlags().Lookup("eth-genesis-block"))
|
viper.BindPFlag("ethereum.genesisBlock", serveCmd.PersistentFlags().Lookup("eth-genesis-block"))
|
||||||
viper.BindPFlag("ethereum.networkID", watchCmd.PersistentFlags().Lookup("eth-network-id"))
|
viper.BindPFlag("ethereum.networkID", serveCmd.PersistentFlags().Lookup("eth-network-id"))
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum"
|
"github.com/ethereum/go-ethereum"
|
||||||
|
@ -29,9 +29,9 @@ import (
|
|||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth/mocks"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
|
@ -30,8 +30,8 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
ipfsethdb "github.com/vulcanize/pg-ipfs-ethdb"
|
ipfsethdb "github.com/vulcanize/pg-ipfs-ethdb"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,8 +26,8 @@ import (
|
|||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
)
|
)
|
||||||
|
@ -24,9 +24,9 @@ import (
|
|||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth/mocks"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
@ -465,210 +465,6 @@ var _ = Describe("Retriever", func() {
|
|||||||
Expect(num).To(Equal(int64(1010101)))
|
Expect(num).To(Equal(int64(1010101)))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("RetrieveGapsInData", func() {
|
|
||||||
It("Doesn't return gaps if there are none", func() {
|
|
||||||
payload0 := mocks.MockConvertedPayload
|
|
||||||
payload0.Block = newMockBlock(0)
|
|
||||||
payload1 := mocks.MockConvertedPayload
|
|
||||||
payload2 := payload1
|
|
||||||
payload2.Block = newMockBlock(2)
|
|
||||||
payload3 := payload2
|
|
||||||
payload3.Block = newMockBlock(3)
|
|
||||||
err := repo.Publish(payload0)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload2)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload3)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(len(gaps)).To(Equal(0))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("Returns the gap from 0 to the earliest block", func() {
|
|
||||||
payload := mocks.MockConvertedPayload
|
|
||||||
payload.Block = newMockBlock(5)
|
|
||||||
err := repo.Publish(payload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(len(gaps)).To(Equal(1))
|
|
||||||
Expect(gaps[0].Start).To(Equal(uint64(0)))
|
|
||||||
Expect(gaps[0].Stop).To(Equal(uint64(4)))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("Can handle single block gaps", func() {
|
|
||||||
payload0 := mocks.MockConvertedPayload
|
|
||||||
payload0.Block = newMockBlock(0)
|
|
||||||
payload1 := mocks.MockConvertedPayload
|
|
||||||
payload3 := payload1
|
|
||||||
payload3.Block = newMockBlock(3)
|
|
||||||
err := repo.Publish(payload0)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload3)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(len(gaps)).To(Equal(1))
|
|
||||||
Expect(gaps[0].Start).To(Equal(uint64(2)))
|
|
||||||
Expect(gaps[0].Stop).To(Equal(uint64(2)))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("Finds gap between two entries", func() {
|
|
||||||
payload1 := mocks.MockConvertedPayload
|
|
||||||
payload1.Block = newMockBlock(1010101)
|
|
||||||
payload2 := payload1
|
|
||||||
payload2.Block = newMockBlock(0)
|
|
||||||
err := repo.Publish(payload1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload2)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(len(gaps)).To(Equal(1))
|
|
||||||
Expect(gaps[0].Start).To(Equal(uint64(1)))
|
|
||||||
Expect(gaps[0].Stop).To(Equal(uint64(1010100)))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("Finds gaps between multiple entries", func() {
|
|
||||||
payload1 := mocks.MockConvertedPayload
|
|
||||||
payload1.Block = newMockBlock(1010101)
|
|
||||||
payload2 := mocks.MockConvertedPayload
|
|
||||||
payload2.Block = newMockBlock(1)
|
|
||||||
payload3 := mocks.MockConvertedPayload
|
|
||||||
payload3.Block = newMockBlock(5)
|
|
||||||
payload4 := mocks.MockConvertedPayload
|
|
||||||
payload4.Block = newMockBlock(100)
|
|
||||||
payload5 := mocks.MockConvertedPayload
|
|
||||||
payload5.Block = newMockBlock(101)
|
|
||||||
payload6 := mocks.MockConvertedPayload
|
|
||||||
payload6.Block = newMockBlock(102)
|
|
||||||
payload7 := mocks.MockConvertedPayload
|
|
||||||
payload7.Block = newMockBlock(103)
|
|
||||||
payload8 := mocks.MockConvertedPayload
|
|
||||||
payload8.Block = newMockBlock(104)
|
|
||||||
payload9 := mocks.MockConvertedPayload
|
|
||||||
payload9.Block = newMockBlock(105)
|
|
||||||
payload10 := mocks.MockConvertedPayload
|
|
||||||
payload10.Block = newMockBlock(106)
|
|
||||||
payload11 := mocks.MockConvertedPayload
|
|
||||||
payload11.Block = newMockBlock(1000)
|
|
||||||
|
|
||||||
err := repo.Publish(payload1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload2)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload3)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload4)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload5)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload6)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload7)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload8)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload9)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload10)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload11)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
|
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(len(gaps)).To(Equal(5))
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 0, Stop: 0})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 2, Stop: 4})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 6, Stop: 99})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 107, Stop: 999})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 1001, Stop: 1010100})).To(BeTrue())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("Finds validation level gaps", func() {
|
|
||||||
|
|
||||||
payload1 := mocks.MockConvertedPayload
|
|
||||||
payload1.Block = newMockBlock(1010101)
|
|
||||||
payload2 := mocks.MockConvertedPayload
|
|
||||||
payload2.Block = newMockBlock(1)
|
|
||||||
payload3 := mocks.MockConvertedPayload
|
|
||||||
payload3.Block = newMockBlock(5)
|
|
||||||
payload4 := mocks.MockConvertedPayload
|
|
||||||
payload4.Block = newMockBlock(100)
|
|
||||||
payload5 := mocks.MockConvertedPayload
|
|
||||||
payload5.Block = newMockBlock(101)
|
|
||||||
payload6 := mocks.MockConvertedPayload
|
|
||||||
payload6.Block = newMockBlock(102)
|
|
||||||
payload7 := mocks.MockConvertedPayload
|
|
||||||
payload7.Block = newMockBlock(103)
|
|
||||||
payload8 := mocks.MockConvertedPayload
|
|
||||||
payload8.Block = newMockBlock(104)
|
|
||||||
payload9 := mocks.MockConvertedPayload
|
|
||||||
payload9.Block = newMockBlock(105)
|
|
||||||
payload10 := mocks.MockConvertedPayload
|
|
||||||
payload10.Block = newMockBlock(106)
|
|
||||||
payload11 := mocks.MockConvertedPayload
|
|
||||||
payload11.Block = newMockBlock(107)
|
|
||||||
payload12 := mocks.MockConvertedPayload
|
|
||||||
payload12.Block = newMockBlock(108)
|
|
||||||
payload13 := mocks.MockConvertedPayload
|
|
||||||
payload13.Block = newMockBlock(109)
|
|
||||||
payload14 := mocks.MockConvertedPayload
|
|
||||||
payload14.Block = newMockBlock(1000)
|
|
||||||
|
|
||||||
err := repo.Publish(payload1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload2)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload3)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload4)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload5)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload6)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload7)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload8)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload9)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload10)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload11)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload12)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload13)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = repo.Publish(payload14)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
|
|
||||||
cleaner := eth2.NewDBCleaner(db)
|
|
||||||
err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}, {106, 108}})
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
|
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(len(gaps)).To(Equal(8))
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 0, Stop: 0})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 2, Stop: 4})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 6, Stop: 99})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 101, Stop: 102})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 104, Stop: 104})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 106, Stop: 108})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 110, Stop: 999})).To(BeTrue())
|
|
||||||
Expect(shared.ListContainsGap(gaps, eth2.DBGap{Start: 1001, Stop: 1010100})).To(BeTrue())
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
func newMockBlock(blockNumber uint64) *types.Block {
|
func newMockBlock(blockNumber uint64) *types.Block {
|
||||||
|
@ -26,9 +26,9 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/statediff"
|
"github.com/ethereum/go-ethereum/statediff"
|
||||||
"github.com/multiformats/go-multihash"
|
"github.com/multiformats/go-multihash"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Filterer interface for substituing mocks in tests
|
// Filterer interface for substituing mocks in tests
|
||||||
|
@ -23,8 +23,8 @@ import (
|
|||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth/mocks"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
|
@ -25,9 +25,9 @@ import (
|
|||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
)
|
)
|
||||||
|
@ -20,9 +20,9 @@ import (
|
|||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth/mocks"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
|
@ -19,8 +19,8 @@ package eth
|
|||||||
import (
|
import (
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TearDownDB is used to tear down the watcher dbs after tests
|
// TearDownDB is used to tear down the watcher dbs after tests
|
||||||
|
@ -23,8 +23,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
|
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
v "github.com/vulcanize/ipld-eth-server/version"
|
v "github.com/vulcanize/ipld-eth-server/version"
|
||||||
@ -87,12 +85,6 @@ func (api *PublicServerAPI) Stream(ctx context.Context, params eth.SubscriptionS
|
|||||||
return rpcSub, nil
|
return rpcSub, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Node is a public rpc method to allow transformers to fetch the node info for the watcher
|
|
||||||
// NOTE: this is the node info for the node that the watcher is syncing from, not the node info for the watcher itself
|
|
||||||
func (api *PublicServerAPI) Node() *node.Info {
|
|
||||||
return api.w.Node()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Chain returns the chain type that this watcher instance supports
|
// Chain returns the chain type that this watcher instance supports
|
||||||
func (api *PublicServerAPI) Chain() shared.ChainType {
|
func (api *PublicServerAPI) Chain() shared.ChainType {
|
||||||
return api.w.Chain()
|
return api.w.Chain()
|
||||||
|
@ -20,12 +20,13 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/vulcanize/ipld-eth-indexer/pkg/node"
|
||||||
|
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/utils"
|
"github.com/vulcanize/ipld-eth-indexer/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Env variables
|
// Env variables
|
||||||
@ -79,7 +80,7 @@ func NewConfig() (*Config, error) {
|
|||||||
}
|
}
|
||||||
c.HTTPEndpoint = httpPath
|
c.HTTPEndpoint = httpPath
|
||||||
overrideDBConnConfig(&c.DBConfig)
|
overrideDBConnConfig(&c.DBConfig)
|
||||||
serveDB := utils.LoadPostgres(c.DBConfig, postgres.Info{})
|
serveDB := utils.LoadPostgres(c.DBConfig, node.Info{})
|
||||||
c.DB = &serveDB
|
c.DB = &serveDB
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
|
@ -28,9 +28,8 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
eth2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||||
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/pkg/shared"
|
||||||
@ -88,7 +87,6 @@ func NewServer(settings *Config) (Server, error) {
|
|||||||
sn.QuitChan = make(chan bool)
|
sn.QuitChan = make(chan bool)
|
||||||
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
|
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
|
||||||
sn.SubscriptionTypes = make(map[common.Hash]eth.SubscriptionSettings)
|
sn.SubscriptionTypes = make(map[common.Hash]eth.SubscriptionSettings)
|
||||||
sn.NodeInfo = &settings.NodeInfo
|
|
||||||
return sn, nil
|
return sn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -273,7 +271,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params eth.S
|
|||||||
for i := startingBlock; i <= endingBlock; i++ {
|
for i := startingBlock; i <= endingBlock; i++ {
|
||||||
select {
|
select {
|
||||||
case <-sap.QuitChan:
|
case <-sap.QuitChan:
|
||||||
log.Infof("%s watcher historical data feed to subscription %s closed", id)
|
log.Infof("ethereum historical data feed to subscription %s closed", id)
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@ -309,7 +307,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params eth.S
|
|||||||
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}:
|
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}:
|
||||||
log.Debugf("eth ipld server sending backFill completion notice to subscription %s", id)
|
log.Debugf("eth ipld server sending backFill completion notice to subscription %s", id)
|
||||||
default:
|
default:
|
||||||
log.Infof("eth ipld server unable to send backFill completion notice to %s subscription %s", id)
|
log.Infof("eth ipld server unable to send backFill completion notice to subscription %s", id)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
|
@ -18,7 +18,7 @@ package shared
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/node"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Env variables
|
// Env variables
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
node "github.com/ipfs/go-ipld-format"
|
node "github.com/ipfs/go-ipld-format"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HandleZeroAddrPointer will return an emtpy string for a nil address pointer
|
// HandleZeroAddrPointer will return an emtpy string for a nil address pointer
|
||||||
|
@ -19,14 +19,14 @@ package shared
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/multiformats/go-multihash"
|
"github.com/multiformats/go-multihash"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/node"
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetupDB is use to setup a db for watcher tests
|
// SetupDB is use to setup a db for watcher tests
|
||||||
|
@ -22,7 +22,7 @@ import (
|
|||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
|
||||||
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
|
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
var DBConfig postgres.Config
|
var DBConfig postgres.Config
|
||||||
|
Loading…
Reference in New Issue
Block a user