diff --git a/cmd/boot.go b/cmd/boot.go index 63e15b9..c40f8b2 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -18,7 +18,10 @@ package cmd import ( "context" + "fmt" + "net/http" "os" + "strconv" "syscall" log "github.com/sirupsen/logrus" @@ -60,6 +63,12 @@ func bootApp() { notifierCh <- syscall.SIGTERM }() + if viper.GetBool("t.pprof") { + go func() { + log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil)) + }() + } + err = shutdown.ShutdownBoot(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") diff --git a/cmd/capture.go b/cmd/capture.go index f5693a6..d144834 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -50,6 +50,8 @@ var ( maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second notifierCh chan os.Signal = make(chan os.Signal, 1) testDisregardSync bool + isTestPprof bool + testPprofPort int ) // captureCmd represents the capture command @@ -114,6 +116,8 @@ func init() { //// Testing Specific captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?") + captureCmd.PersistentFlags().BoolVar(&isTestPprof, "t.pprof", false, "Should we start pprof?") + captureCmd.PersistentFlags().IntVar(&testPprofPort, "t.pprofPort", 6060, "What port should we export pprof at?") // Bind Flags with Viper //// DB Flags @@ -133,6 +137,10 @@ func init() { //// Testing Specific err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) exitErr(err) + err = viper.BindPFlag("t.pprof", captureCmd.PersistentFlags().Lookup("t.pprof")) + exitErr(err) + err = viper.BindPFlag("t.pprofPort", captureCmd.PersistentFlags().Lookup("t.pprofPort")) + exitErr(err) //// LH specific err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address")) diff --git a/cmd/full.go b/cmd/full.go index 0c4b9d2..7d50c8a 100644 --- a/cmd/full.go +++ b/cmd/full.go @@ -19,6 +19,7 @@ package cmd import ( "context" "fmt" + "net/http" "strconv" log "github.com/sirupsen/logrus" @@ -107,6 +108,12 @@ func startFullProcessing() { }() } + if viper.GetBool("t.pprof") { + go func() { + log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil)) + }() + } + // Shutdown when the time is right. err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { diff --git a/cmd/head.go b/cmd/head.go index ba70f8c..4688087 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -81,6 +81,12 @@ func startHeadTracking() { }() } + if viper.GetBool("t.pprof") { + go func() { + log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil)) + }() + } + // Shutdown when the time is right. err = shutdown.ShutdownHeadTracking(ctx, KgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { diff --git a/cmd/historic.go b/cmd/historic.go index 1c6b653..6e0a03e 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -22,6 +22,9 @@ import ( "os" "strconv" + "net/http" + _ "net/http/pprof" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -92,6 +95,12 @@ func startHistoricProcessing() { }() } + if viper.GetBool("t.pprof") { + go func() { + log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil)) + }() + } + // Shutdown when the time is right. err = shutdown.ShutdownHistoricProcessing(ctx, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { diff --git a/config/cicd/boot.ipld-eth-beacon-indexer.json b/config/cicd/boot.ipld-eth-beacon-indexer.json index b10cc13..a042f7b 100644 --- a/config/cicd/boot.ipld-eth-beacon-indexer.json +++ b/config/cicd/boot.ipld-eth-beacon-indexer.json @@ -19,7 +19,9 @@ "checkDb": true }, "t": { - "skipSync": true + "skipSync": true, + "pprof": true, + "pprofPort": 6060 }, "log": { "level": "debug", diff --git a/config/example.ipld-eth-beacon-indexer-config.json b/config/example.ipld-eth-beacon-indexer-config.json index 7481284..b41b32c 100644 --- a/config/example.ipld-eth-beacon-indexer-config.json +++ b/config/example.ipld-eth-beacon-indexer-config.json @@ -19,7 +19,9 @@ "checkDb": true }, "t": { - "skipSync": true + "skipSync": true, + "pprof": true, + "pprofPort": 6060 }, "log": { "level": "debug", diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 1b8f619..4e075b4 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -63,12 +63,12 @@ type ProcessSlot struct { PerformanceMetrics PerformanceMetrics // An object to keep track of performance metrics. // BeaconBlock - SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock + SszSignedBeaconBlock *[]byte // The entire SSZ encoded SignedBeaconBlock FullSignedBeaconBlock si.SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors. // BeaconState FullBeaconState state.BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors. - SszBeaconState []byte // The entire SSZ encoded BeaconState + SszBeaconState *[]byte // The entire SSZ encoded BeaconState // DB Write objects DbSlotsModel *DbSlots // The model being written to the slots table. @@ -155,6 +155,11 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, }) if err := g.Wait(); err != nil { + // Make sure channel is empty. + select { + case <-vUnmarshalerCh: + default: + } return err, "processSlot" } @@ -270,7 +275,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d vm := <-vmCh if rc != 200 { ps.FullSignedBeaconBlock = &wrapper.Phase0SignedBeaconBlock{} - ps.SszSignedBeaconBlock = []byte{} + ps.SszSignedBeaconBlock = &[]byte{} ps.ParentBlockRoot = "" ps.Status = "skipped" return nil @@ -280,7 +285,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d return fmt.Errorf(VersionedUnmarshalerError) } - ps.FullSignedBeaconBlock, err = vm.UnmarshalBeaconBlock(ps.SszSignedBeaconBlock) + ps.FullSignedBeaconBlock, err = vm.UnmarshalBeaconBlock(*ps.SszSignedBeaconBlock) if err != nil { loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Warn("Unable to process the slots SignedBeaconBlock") return nil @@ -300,14 +305,14 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) - versionedUnmarshaler, err := dt.FromState(ps.SszBeaconState) + versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState) if err != nil { loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(VersionedUnmarshalerError) vmCh <- nil return fmt.Errorf(VersionedUnmarshalerError) } vmCh <- versionedUnmarshaler - ps.FullBeaconState, err = versionedUnmarshaler.UnmarshalBeaconState(ps.SszBeaconState) + ps.FullBeaconState, err = versionedUnmarshaler.UnmarshalBeaconState(*ps.SszBeaconState) if err != nil { loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to process the slots BeaconState") return err @@ -356,7 +361,7 @@ func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash st status = "proposed" } - dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics) + dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics) if err != nil { return dw, err } diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 5294335..4962d2b 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -28,7 +28,7 @@ import ( ) // A helper function to query endpoints that utilize slots. -func querySsz(endpoint string, slot string) ([]byte, int, error) { +func querySsz(endpoint string, slot string) (*[]byte, int, error) { log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint") client := &http.Client{} req, err := http.NewRequest("GET", endpoint, nil) @@ -49,7 +49,7 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) { loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) } - return body, rc, nil + return &body, rc, nil } // Object to unmarshal the BlockRootResponse