Improve entrypoint to be more helpful #63

Merged
abdulrabbani00 merged 3 commits from develop into main 2022-06-14 13:36:38 +00:00
3 changed files with 20 additions and 9 deletions

View File

@ -3,17 +3,21 @@
sleep 10 sleep 10
echo "Starting ipld-eth-beacon-indexer" echo "Starting ipld-eth-beacon-indexer"
echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json echo /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
/root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json /root/ipld-eth-beacon-indexer capture ${CAPTURE_MODE} --config /root/ipld-eth-beacon-config.json > /root/ipld-eth-beacon-indexer.output
rv=$? rv=$?
if [ $rv != 0 ]; then if [ $rv != 0 ]; then
echo "ipld-eth-beacon-indexer startup failed" echo "ipld-eth-beacon-indexer failed"
echo 1 > /root/HEALTH echo $rv > /root/HEALTH
echo $rv
cat /root/ipld-eth-beacon-indexer.output
else else
echo "ipld-eth-beacon-indexer startup succeeded" echo "ipld-eth-beacon-indexer succeeded"
echo 0 > /root/HEALTH echo $rv > /root/HEALTH
echo $rv
cat /root/ipld-eth-beacon-indexer.output
fi fi
tail -f /dev/null tail -f /dev/null

View File

@ -100,6 +100,8 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
// Start workers // Start workers
for w := 1; w <= maxWorkers; w++ { for w := 1; w <= maxWorkers; w++ {
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
// Pass in function to increment metric! KnownGapProcessing or HistoricProcessing
go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb) go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb)
} }
@ -125,6 +127,7 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
} else if slots.startSlot == slots.endSlot { } else if slots.startSlot == slots.endSlot {
log.WithField("slot", slots.startSlot).Debug("Added new slot to workCh") log.WithField("slot", slots.startSlot).Debug("Added new slot to workCh")
workCh <- slots.startSlot workCh <- slots.startSlot
processedCh <- slots
} else { } else {
for i := slots.startSlot; i <= slots.endSlot; i++ { for i := slots.startSlot; i <= slots.endSlot; i++ {
workCh <- i workCh <- i

View File

@ -237,8 +237,11 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
case slots := <-processCh: case slots := <-processCh:
// Make sure the start and end slot exist in the slots table. // Make sure the start and end slot exist in the slots table.
go func() { go func() {
finishedProcess := false log.WithFields(log.Fields{
for !finishedProcess { "startSlot": slots.startSlot,
"endSlot": slots.endSlot,
}).Debug("Starting to check to see if the following slots have been processed")
for {
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
if err != nil { if err != nil {
errCh <- err errCh <- err
@ -248,8 +251,9 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
errCh <- err errCh <- err
} }
if isStartProcess && isEndProcess { if isStartProcess && isEndProcess {
finishedProcess = true break
} }
time.Sleep(1000 * time.Millisecond)
} }
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))