diff --git a/cmd/resync.go b/cmd/resync.go index 60826326..af16a58e 100644 --- a/cmd/resync.go +++ b/cmd/resync.go @@ -38,19 +38,22 @@ var resyncCmd = &cobra.Command{ } func rsyncCmdCommand() { + logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta) + logWithCommand.Debug("loading super node configuration variables") rConfig, err := resync.NewReSyncConfig() if err != nil { logWithCommand.Fatal(err) } - logWithCommand.Infof("vdb version: %s", v.VersionWithMeta) logWithCommand.Infof("resync config: %+v", rConfig) if err := ipfs.InitIPFSPlugins(); err != nil { logWithCommand.Fatal(err) } + logWithCommand.Debug("initializing new resync service") rService, err := resync.NewResyncService(rConfig) if err != nil { logWithCommand.Fatal(err) } + logWithCommand.Info("starting up resync process") if err := rService.Resync(); err != nil { logWithCommand.Fatal(err) } diff --git a/cmd/superNode.go b/cmd/superNode.go index 3ab4e416..e2cfdd08 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -53,52 +53,61 @@ and fill in gaps in the data } func superNode() { + logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta) + logWithCommand.Debug("loading super node configuration variables") superNodeConfig, err := super_node.NewSuperNodeConfig() if err != nil { logWithCommand.Fatal(err) } - logWithCommand.Infof("vdb version: %s", v.VersionWithMeta) logWithCommand.Infof("super node config: %+v", superNodeConfig) if err := ipfs.InitIPFSPlugins(); err != nil { logWithCommand.Fatal(err) } wg := &sync.WaitGroup{} + logWithCommand.Debug("initializing new super node service") superNode, err := super_node.NewSuperNode(superNodeConfig) if err != nil { logWithCommand.Fatal(err) } var forwardPayloadChan chan shared.ConvertedData if superNodeConfig.Serve { + logWithCommand.Info("starting up super node servers") forwardPayloadChan = make(chan shared.ConvertedData, super_node.PayloadChanBufferSize) - superNode.FilterAndServe(wg, forwardPayloadChan) + superNode.Serve(wg, forwardPayloadChan) if err := startServers(superNode, superNodeConfig); err != nil { logWithCommand.Fatal(err) } } if superNodeConfig.Sync { - if err := superNode.ProcessData(wg, forwardPayloadChan); err != nil { + logWithCommand.Info("starting up super node sync process") + if err := superNode.Sync(wg, forwardPayloadChan); err != nil { logWithCommand.Fatal(err) } } if superNodeConfig.BackFill { + logWithCommand.Debug("initializing new super node backfill service") backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan) if err != nil { logWithCommand.Fatal(err) } - backFiller.FillGapsInSuperNode(wg) + logWithCommand.Info("starting up super node backfill process") + backFiller.BackFill(wg) } wg.Wait() } func startServers(superNode super_node.SuperNode, settings *super_node.Config) error { + logWithCommand.Debug("starting up IPC server") _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs()) if err != nil { return err } + logWithCommand.Debug("starting up WS server") _, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, superNode.APIs(), []string{"vdb"}, nil, true) if err != nil { return err } + logWithCommand.Debug("starting up HTTP server") _, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{settings.Chain.API()}, nil, nil, rpc.HTTPTimeouts{}) return err } @@ -106,7 +115,7 @@ func startServers(superNode super_node.SuperNode, settings *super_node.Config) e func init() { rootCmd.AddCommand(superNodeCmd) - // flags + // flags for all config variables superNodeCmd.PersistentFlags().String("ipfs-path", "", "ipfs repository path") superNodeCmd.PersistentFlags().String("supernode-chain", "", "which chain to support, options are currently Ethereum or Bitcoin.") diff --git a/pkg/ipfs/builders.go b/pkg/ipfs/builders.go index d76bb16b..317a9454 100644 --- a/pkg/ipfs/builders.go +++ b/pkg/ipfs/builders.go @@ -19,6 +19,8 @@ package ipfs import ( "context" + "github.com/sirupsen/logrus" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/plugin/loader" @@ -29,6 +31,7 @@ import ( // InitIPFSPlugins is used to initialized IPFS plugins before creating a new IPFS node // This should only be called once func InitIPFSPlugins() error { + logrus.Debug("initializing IPFS plugins") l, err := loader.NewPluginLoader("") if err != nil { return err @@ -42,6 +45,7 @@ func InitIPFSPlugins() error { // InitIPFSBlockService is used to configure and return a BlockService using an ipfs repo path (e.g. ~/.ipfs) func InitIPFSBlockService(ipfsPath string) (blockservice.BlockService, error) { + logrus.Debug("initializing IPFS block service interface") r, openErr := fsrepo.Open(ipfsPath) if openErr != nil { return nil, openErr @@ -68,6 +72,7 @@ func (ipfs IPFS) Add(node ipld.Node) error { } func InitIPFSNode(repoPath string) (*IPFS, error) { + logrus.Debug("initializing IPFS node interface") r, err := fsrepo.Open(repoPath) if err != nil { return nil, err diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index ff7514bb..2684df69 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -36,7 +36,7 @@ const ( // BackFillInterface for filling in gaps in the super node type BackFillInterface interface { // Method for the super node to periodically check for and fill in gaps in its data using an archival node - FillGapsInSuperNode(wg *sync.WaitGroup) + BackFill(wg *sync.WaitGroup) } // BackFillService for filling in gaps in the super node @@ -113,8 +113,8 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert }, nil } -// FillGapsInSuperNode periodically checks for and fills in gaps in the super node db -func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { +// BackFill periodically checks for and fills in gaps in the super node db +func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) { ticker := time.NewTicker(bfs.GapCheckFrequency) wg.Add(1) @@ -151,7 +151,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { } } }() - log.Infof("%s fillGaps goroutine successfully spun up", bfs.Chain.String()) + log.Infof("%s BackFill goroutine successfully spun up", bfs.Chain.String()) } // backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 56adea4e..6ecd773a 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -72,7 +72,7 @@ var _ = Describe("BackFiller", func() { Chain: shared.Ethereum, } wg := &sync.WaitGroup{} - backfiller.FillGapsInSuperNode(wg) + backfiller.BackFill(wg) time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) @@ -128,7 +128,7 @@ var _ = Describe("BackFiller", func() { Chain: shared.Ethereum, } wg := &sync.WaitGroup{} - backfiller.FillGapsInSuperNode(wg) + backfiller.BackFill(wg) time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) @@ -178,7 +178,7 @@ var _ = Describe("BackFiller", func() { Chain: shared.Ethereum, } wg := &sync.WaitGroup{} - backfiller.FillGapsInSuperNode(wg) + backfiller.BackFill(wg) time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) diff --git a/pkg/super_node/btc/http_streamer.go b/pkg/super_node/btc/http_streamer.go index 3cee388a..a9085f83 100644 --- a/pkg/super_node/btc/http_streamer.go +++ b/pkg/super_node/btc/http_streamer.go @@ -42,7 +42,7 @@ func NewHTTPPayloadStreamer(clientConfig *rpcclient.ConnConfig) *HTTPPayloadStre // Stream is the main loop for subscribing to data from the btc block notifications // Satisfies the shared.PayloadStreamer interface func (ps *HTTPPayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) { - logrus.Info("streaming block payloads from btc") + logrus.Debug("streaming block payloads from btc") client, err := rpcclient.New(ps.Config, nil) if err != nil { return nil, err diff --git a/pkg/super_node/eth/retriever_test.go b/pkg/super_node/eth/retriever_test.go index 444a5d24..7a2a4a7c 100644 --- a/pkg/super_node/eth/retriever_test.go +++ b/pkg/super_node/eth/retriever_test.go @@ -602,8 +602,14 @@ var _ = Describe("Retriever", func() { payload8.HeaderCID.BlockNumber = "105" payload9 := payload4 payload9.HeaderCID.BlockNumber = "106" - payload10 := payload5 - payload10.HeaderCID.BlockNumber = "1000" + payload10 := payload4 + payload10.HeaderCID.BlockNumber = "107" + payload11 := payload4 + payload11.HeaderCID.BlockNumber = "108" + payload12 := payload4 + payload12.HeaderCID.BlockNumber = "109" + payload13 := payload5 + payload13.HeaderCID.BlockNumber = "1000" err := repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) @@ -624,18 +630,25 @@ var _ = Describe("Retriever", func() { Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload10) Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload11) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload12) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload13) + Expect(err).ToNot(HaveOccurred()) cleaner := eth.NewCleaner(db) - err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}}) + err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}, {106, 108}}) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) - Expect(len(gaps)).To(Equal(5)) + Expect(len(gaps)).To(Equal(6)) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue()) - Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 106, Stop: 108})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 110, Stop: 999})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) }) }) diff --git a/pkg/super_node/eth/streamer.go b/pkg/super_node/eth/streamer.go index c16ad6fe..31c60655 100644 --- a/pkg/super_node/eth/streamer.go +++ b/pkg/super_node/eth/streamer.go @@ -51,7 +51,7 @@ func NewPayloadStreamer(client StreamClient) *PayloadStreamer { // Satisfies the shared.PayloadStreamer interface func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) { stateDiffChan := make(chan statediff.Payload, PayloadChanBufferSize) - logrus.Info("streaming diffs from geth") + logrus.Debug("streaming diffs from geth") go func() { for { select { diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 79eb0270..53b5bb24 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -38,15 +38,15 @@ const ( ) // SuperNode is the top level interface for streaming, converting to IPLDs, publishing, -// and indexing all Ethereum data; screening this data; and serving it up to subscribed clients +// and indexing all chain data; screening this data; and serving it up to subscribed clients // This service is compatible with the Ethereum service interface (node.Service) type SuperNode interface { // APIs(), Protocols(), Start() and Stop() node.Service // Data processing event loop - ProcessData(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error + Sync(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error // Pub-Sub handling event loop - FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) + Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) // Method to subscribe to the service Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) // Method to unsubscribe from the service @@ -186,11 +186,11 @@ func (sap *Service) APIs() []rpc.API { return append(apis, chainAPI) } -// ProcessData streams incoming raw chain data and converts it for further processing +// Sync streams incoming raw chain data and converts it for further processing // It forwards the converted data to the publishAndIndex process(es) it spins up // If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel // This continues on no matter if or how many subscribers there are -func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error { +func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error { sub, err := sap.Streamer.Stream(sap.PayloadChan) if err != nil { return err @@ -213,7 +213,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- log.Errorf("super node conversion error for chain %s: %v", sap.chain.String(), err) continue } - log.Infof("processing %s data streamed at head height %d", sap.chain.String(), ipldPayload.Height()) + log.Infof("%s data streamed at head height %d", sap.chain.String(), ipldPayload.Height()) // If we have a ScreenAndServe process running, forward the iplds to it select { case screenAndServePayload <- ipldPayload: @@ -230,7 +230,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- } } }() - log.Infof("%s ProcessData goroutine successfully spun up", sap.chain.String()) + log.Infof("%s Sync goroutine successfully spun up", sap.chain.String()) return nil } @@ -241,11 +241,13 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared for { select { case payload := <-publishAndIndexPayload: + log.Debugf("publishing %s data streamed at head height %d", sap.chain.String(), payload.Height()) cidPayload, err := sap.Publisher.Publish(payload) if err != nil { log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err) continue } + log.Debugf("indexing %s data streamed at head height %d", sap.chain.String(), payload.Height()) if err := sap.Indexer.Index(cidPayload); err != nil { log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err) } @@ -255,11 +257,11 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared log.Debugf("%s publishAndIndex goroutine successfully spun up", sap.chain.String()) } -// FilterAndServe listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process +// Serve listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process // It filters and sends this data to any subscribers to the service // This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only -func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) { +func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) { wg.Add(1) go func() { for { @@ -273,7 +275,7 @@ func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-c } } }() - log.Infof("%s FilterAndServe goroutine successfully spun up", sap.chain.String()) + log.Infof("%s Serve goroutine successfully spun up", sap.chain.String()) } // filterAndServe filters the payload according to each subscription type and sends to the subscriptions @@ -444,10 +446,10 @@ func (sap *Service) Start(*p2p.Server) error { log.Infof("Starting %s super node service", sap.chain.String()) wg := new(sync.WaitGroup) payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize) - if err := sap.ProcessData(wg, payloadChan); err != nil { + if err := sap.Sync(wg, payloadChan); err != nil { return err } - sap.FilterAndServe(wg, payloadChan) + sap.Serve(wg, payloadChan) return nil } diff --git a/pkg/super_node/service_test.go b/pkg/super_node/service_test.go index eb1b5c5e..face19e7 100644 --- a/pkg/super_node/service_test.go +++ b/pkg/super_node/service_test.go @@ -31,7 +31,7 @@ import ( ) var _ = Describe("Service", func() { - Describe("SyncAndPublish", func() { + Describe("Sync", func() { It("Streams statediff.Payloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() { wg := new(sync.WaitGroup) payloadChan := make(chan shared.RawChainData, 1) @@ -63,7 +63,7 @@ var _ = Describe("Service", func() { QuitChan: quitChan, WorkerPoolSize: 1, } - err := processor.ProcessData(wg, nil) + err := processor.Sync(wg, nil) Expect(err).ToNot(HaveOccurred()) time.Sleep(2 * time.Second) quitChan <- true diff --git a/version/version.go b/version/version.go index 480d6a9b..43c8f6ed 100644 --- a/version/version.go +++ b/version/version.go @@ -21,7 +21,7 @@ import "fmt" const ( VersionMajor = 0 // Major version component of the current release VersionMinor = 1 // Minor version component of the current release - VersionPatch = 0 // Patch version component of the current release + VersionPatch = 1 // Patch version component of the current release VersionMeta = "alpha" // Version metadata to append to the version string )