Feature/study application memory #70

Open
abdulrabbani00 wants to merge 18 commits from feature/study-application-memory into develop
3 changed files with 24 additions and 11 deletions
Showing only changes of commit ac1e3075e2 - Show all commits

View File

@ -82,7 +82,7 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- P, errorCh chan<
return return
} }
processCh <- msgMarshaled processCh <- msgMarshaled
log.Info("Done sending") log.Debug("Done sending")
} }
// Capture all of the event topics. // Capture all of the event topics.

View File

@ -156,10 +156,6 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
if err := g.Wait(); err != nil { if err := g.Wait(); err != nil {
// Make sure channel is empty. // Make sure channel is empty.
select {
case <-vUnmarshalerCh:
default:
}
return err, "processSlot" return err, "processSlot"
} }
@ -296,14 +292,20 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d
// Update the SszBeaconState and FullBeaconState object with their respective values. // Update the SszBeaconState and FullBeaconState object with their respective values.
func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.VersionedUnmarshaler) error { func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.VersionedUnmarshaler) error {
var stateIdentifier string // Used to query the state var (
stateIdentifier string // Used to query the state
err error
)
if ps.StateRoot != "" { if ps.StateRoot != "" {
stateIdentifier = ps.StateRoot stateIdentifier = ps.StateRoot
} else { } else {
stateIdentifier = strconv.Itoa(ps.Slot) stateIdentifier = strconv.Itoa(ps.Slot)
} }
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) ps.SszBeaconState, _, err = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
if err != nil {
return fmt.Errorf("Unable to querrySSZ")
}
versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState) versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState)
if err != nil { if err != nil {

View File

@ -3,11 +3,11 @@ package beaconclient_test
import ( import (
"context" "context"
"os" "os"
"runtime"
"strconv" "strconv"
"time" "time"
. "github.com/onsi/ginkgo/v2" . "github.com/onsi/ginkgo/v2"
//. "github.com/onsi/gomega" //. "github.com/onsi/gomega"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient"
) )
@ -65,12 +65,23 @@ func getEnvInt(envVar string) int {
// Start head tracking and wait for the expected results. // Start head tracking and wait for the expected results.
func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
startGoRoutines := runtime.NumGoroutine() //startGoRoutines := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, false) go bc.CaptureHead(ctx, 2, false)
time.Sleep(1 * time.Second) time.Sleep(5 * time.Second)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
cancel() cancel()
testStopHeadTracking(ctx, bc, startGoRoutines) time.Sleep(4)
testStopSystemHeadTracking(ctx, bc)
}
// Custom stop for system testing
func testStopSystemHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient) {
bc.StopHeadTracking(ctx, false)
time.Sleep(3 * time.Second)
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
//Expect(endNum <= startGoRoutines).To(BeTrue())
} }