diff --git a/entrypoint.sh b/entrypoint.sh index 13da8da..70858ac 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -3,17 +3,21 @@ sleep 10 echo "Starting ipld-eth-beacon-indexer" -echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json +echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output -/root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json +/root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output rv=$? if [ $rv != 0 ]; then - echo "ipld-eth-beacon-indexer startup failed" - echo 1 > /root/HEALTH + echo "ipld-eth-beacon-indexer failed" + echo $rv > /root/HEALTH + echo $rv + cat /root/ipld-eth-beacon-indexer.output else - echo "ipld-eth-beacon-indexer startup succeeded" - echo 0 > /root/HEALTH + echo "ipld-eth-beacon-indexer succeeded" + echo $rv > /root/HEALTH + echo $rv + cat /root/ipld-eth-beacon-indexer.output fi tail -f /dev/null \ No newline at end of file diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index a6764db..af3bd69 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -100,6 +100,8 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, // Start workers for w := 1; w <= maxWorkers; w++ { log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") + + // Pass in function to increment metric! KnownGapProcessing or HistoricProcessing go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb) } @@ -125,6 +127,7 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, } else if slots.startSlot == slots.endSlot { log.WithField("slot", slots.startSlot).Debug("Added new slot to workCh") workCh <- slots.startSlot + processedCh <- slots } else { for i := slots.startSlot; i <= slots.endSlot; i++ { workCh <- i diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index dcff8f4..7f2096d 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -237,8 +237,11 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan case slots := <-processCh: // Make sure the start and end slot exist in the slots table. go func() { - finishedProcess := false - for !finishedProcess { + log.WithFields(log.Fields{ + "startSlot": slots.startSlot, + "endSlot": slots.endSlot, + }).Debug("Starting to check to see if the following slots have been processed") + for { isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) if err != nil { errCh <- err @@ -248,8 +251,9 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan errCh <- err } if isStartProcess && isEndProcess { - finishedProcess = true + break } + time.Sleep(1000 * time.Millisecond) } _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))