diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index 6859764..5831407 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -82,7 +82,7 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- P, errorCh chan< return } processCh <- msgMarshaled - log.Info("Done sending") + log.Debug("Done sending") } // Capture all of the event topics. diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 4b1f55e..9e09c3a 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -156,10 +156,6 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, if err := g.Wait(); err != nil { // Make sure channel is empty. - select { - case <-vUnmarshalerCh: - default: - } return err, "processSlot" } @@ -296,14 +292,20 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d // Update the SszBeaconState and FullBeaconState object with their respective values. func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.VersionedUnmarshaler) error { - var stateIdentifier string // Used to query the state + var ( + stateIdentifier string // Used to query the state + err error + ) if ps.StateRoot != "" { stateIdentifier = ps.StateRoot } else { stateIdentifier = strconv.Itoa(ps.Slot) } stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier - ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) + ps.SszBeaconState, _, err = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) + if err != nil { + return fmt.Errorf("Unable to querrySSZ") + } versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState) if err != nil { diff --git a/pkg/beaconclient/systemvalidation_test.go b/pkg/beaconclient/systemvalidation_test.go index b204404..3efbd6b 100644 --- a/pkg/beaconclient/systemvalidation_test.go +++ b/pkg/beaconclient/systemvalidation_test.go @@ -3,11 +3,11 @@ package beaconclient_test import ( "context" "os" - "runtime" "strconv" "time" . "github.com/onsi/ginkgo/v2" + //. "github.com/onsi/gomega" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient" ) @@ -65,12 +65,23 @@ func getEnvInt(envVar string) int { // Start head tracking and wait for the expected results. func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { - startGoRoutines := runtime.NumGoroutine() + //startGoRoutines := runtime.NumGoroutine() + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHead(ctx, 2, false) - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) cancel() - testStopHeadTracking(ctx, bc, startGoRoutines) + time.Sleep(4) + testStopSystemHeadTracking(ctx, bc) +} + +// Custom stop for system testing +func testStopSystemHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient) { + bc.StopHeadTracking(ctx, false) + + time.Sleep(3 * time.Second) + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + //Expect(endNum <= startGoRoutines).To(BeTrue()) }