From 9d8f5bc6eb07c88498d962baef96f1bc1fbbdf90 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Wed, 8 Jun 2022 11:54:29 -0400 Subject: [PATCH] Ensure we release locks properly --- pkg/beaconclient/capturehistoric_test.go | 22 ++++++++++++++++++++-- pkg/beaconclient/processhistoric.go | 2 +- pkg/beaconclient/processknowngaps.go | 2 +- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index 9c13f23..0f324b4 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -11,6 +11,12 @@ import ( . "github.com/onsi/gomega" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" +) + +var ( + kgCheckCheckedOutStmt = `SELECT * FROM ethcl.known_gaps WHERE checked_out=true ` + hpCheckCheckedOutStmt = `SELECT * FROM ethcl.historic_process WHERE checked_out=true ` ) var _ = Describe("Capturehistoric", func() { @@ -137,8 +143,9 @@ func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, ma validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) log.Debug("Calling the stop function for historical processing..") err := bc.StopHistoric(cancel) + time.Sleep(5 * time.Second) Expect(err).ToNot(HaveOccurred()) - time.Sleep(3 * time.Second) + validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt) } // Wrapper function that processes knownGaps @@ -147,8 +154,9 @@ func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, max go bc.ProcessKnownGaps(ctx, maxWorkers) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) err := bc.StopKnownGapsProcessing(cancel) + time.Sleep(5 * time.Second) Expect(err).ToNot(HaveOccurred()) - time.Sleep(3 * time.Second) + validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt) } func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { @@ -209,3 +217,13 @@ func validatePopularBatchBlocks(bc *beaconclient.BeaconClient) { validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) } + +// Make sure all rows have checked_out as false. +func validateAllRowsCheckedOut(db sql.Database, checkStmt string) { + res, err := db.Exec(context.Background(), checkStmt) + Expect(err).ToNot(HaveOccurred()) + rows, err := res.RowsAffected() + log.Info("rows: ", rows) + Expect(err).ToNot(HaveOccurred()) + Expect(rows).To(Equal(int64(0))) +} diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 262126d..f8d54a3 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -47,7 +47,7 @@ var ( WHERE start_slot=$1 AND end_slot=$2;` // Used to update every single row that this node has checked out. releaseHpLockStmt string = `UPDATE ethcl.historic_process - SET checked_out=false + SET checked_out=false, checked_out_by=null WHERE checked_out_by=$1` ) diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 0812daf..5ea21e9 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -46,7 +46,7 @@ var ( WHERE start_slot=$1 AND end_slot=$2;` // Used to update every single row that this node has checked out. releaseKgLockStmt string = `UPDATE ethcl.known_gaps - SET checked_out=false + SET checked_out=false, checked_out_by=null WHERE checked_out_by=$1` )