Use pprof + pointers

This commit is contained in:
Abdul Rabbani 2022-06-20 13:56:55 -04:00
parent 117a2deea9
commit 53c4e4243c
9 changed files with 59 additions and 11 deletions

View File

@ -18,7 +18,10 @@ package cmd
import (
"context"
"fmt"
"net/http"
"os"
"strconv"
"syscall"
log "github.com/sirupsen/logrus"
@ -60,6 +63,12 @@ func bootApp() {
notifierCh <- syscall.SIGTERM
}()
if viper.GetBool("t.pprof") {
go func() {
log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil))
}()
}
err = shutdown.ShutdownBoot(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")

View File

@ -50,6 +50,8 @@ var (
maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
testDisregardSync bool
isTestPprof bool
testPprofPort int
)
// captureCmd represents the capture command
@ -114,6 +116,8 @@ func init() {
//// Testing Specific
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
captureCmd.PersistentFlags().BoolVar(&isTestPprof, "t.pprof", false, "Should we start pprof?")
captureCmd.PersistentFlags().IntVar(&testPprofPort, "t.pprofPort", 6060, "What port should we export pprof at?")
// Bind Flags with Viper
//// DB Flags
@ -133,6 +137,10 @@ func init() {
//// Testing Specific
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
exitErr(err)
err = viper.BindPFlag("t.pprof", captureCmd.PersistentFlags().Lookup("t.pprof"))
exitErr(err)
err = viper.BindPFlag("t.pprofPort", captureCmd.PersistentFlags().Lookup("t.pprofPort"))
exitErr(err)
//// LH specific
err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address"))

View File

@ -19,6 +19,7 @@ package cmd
import (
"context"
"fmt"
"net/http"
"strconv"
log "github.com/sirupsen/logrus"
@ -107,6 +108,12 @@ func startFullProcessing() {
}()
}
if viper.GetBool("t.pprof") {
go func() {
log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil))
}()
}
// Shutdown when the time is right.
err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {

View File

@ -81,6 +81,12 @@ func startHeadTracking() {
}()
}
if viper.GetBool("t.pprof") {
go func() {
log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil))
}()
}
// Shutdown when the time is right.
err = shutdown.ShutdownHeadTracking(ctx, KgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {

View File

@ -22,6 +22,9 @@ import (
"os"
"strconv"
"net/http"
_ "net/http/pprof"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -92,6 +95,12 @@ func startHistoricProcessing() {
}()
}
if viper.GetBool("t.pprof") {
go func() {
log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil))
}()
}
// Shutdown when the time is right.
err = shutdown.ShutdownHistoricProcessing(ctx, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {

View File

@ -19,7 +19,9 @@
"checkDb": true
},
"t": {
"skipSync": true
"skipSync": true,
"pprof": true,
"pprofPort": 6060
},
"log": {
"level": "debug",

View File

@ -19,7 +19,9 @@
"checkDb": true
},
"t": {
"skipSync": true
"skipSync": true,
"pprof": true,
"pprofPort": 6060
},
"log": {
"level": "debug",

View File

@ -63,12 +63,12 @@ type ProcessSlot struct {
PerformanceMetrics PerformanceMetrics // An object to keep track of performance metrics.
// BeaconBlock
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
SszSignedBeaconBlock *[]byte // The entire SSZ encoded SignedBeaconBlock
FullSignedBeaconBlock si.SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors.
// BeaconState
FullBeaconState state.BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors.
SszBeaconState []byte // The entire SSZ encoded BeaconState
SszBeaconState *[]byte // The entire SSZ encoded BeaconState
// DB Write objects
DbSlotsModel *DbSlots // The model being written to the slots table.
@ -155,6 +155,11 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
})
if err := g.Wait(); err != nil {
// Make sure channel is empty.
select {
case <-vUnmarshalerCh:
default:
}
return err, "processSlot"
}
@ -270,7 +275,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d
vm := <-vmCh
if rc != 200 {
ps.FullSignedBeaconBlock = &wrapper.Phase0SignedBeaconBlock{}
ps.SszSignedBeaconBlock = []byte{}
ps.SszSignedBeaconBlock = &[]byte{}
ps.ParentBlockRoot = ""
ps.Status = "skipped"
return nil
@ -280,7 +285,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d
return fmt.Errorf(VersionedUnmarshalerError)
}
ps.FullSignedBeaconBlock, err = vm.UnmarshalBeaconBlock(ps.SszSignedBeaconBlock)
ps.FullSignedBeaconBlock, err = vm.UnmarshalBeaconBlock(*ps.SszSignedBeaconBlock)
if err != nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Warn("Unable to process the slots SignedBeaconBlock")
return nil
@ -300,14 +305,14 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
versionedUnmarshaler, err := dt.FromState(ps.SszBeaconState)
versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState)
if err != nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(VersionedUnmarshalerError)
vmCh <- nil
return fmt.Errorf(VersionedUnmarshalerError)
}
vmCh <- versionedUnmarshaler
ps.FullBeaconState, err = versionedUnmarshaler.UnmarshalBeaconState(ps.SszBeaconState)
ps.FullBeaconState, err = versionedUnmarshaler.UnmarshalBeaconState(*ps.SszBeaconState)
if err != nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to process the slots BeaconState")
return err
@ -356,7 +361,7 @@ func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash st
status = "proposed"
}
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics)
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
if err != nil {
return dw, err
}

View File

@ -28,7 +28,7 @@ import (
)
// A helper function to query endpoints that utilize slots.
func querySsz(endpoint string, slot string) ([]byte, int, error) {
func querySsz(endpoint string, slot string) (*[]byte, int, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint")
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
@ -49,7 +49,7 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
}
return body, rc, nil
return &body, rc, nil
}
// Object to unmarshal the BlockRootResponse