Merge pull request #64 from vulcanize/feature/bug-remove-processed-slots
Send slots to processedCh and remove
This commit is contained in:
commit
d5a9f999b3
@ -100,6 +100,8 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
|
||||
// Start workers
|
||||
for w := 1; w <= maxWorkers; w++ {
|
||||
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
|
||||
|
||||
// Pass in function to increment metric! KnownGapProcessing or HistoricProcessing
|
||||
go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb)
|
||||
}
|
||||
|
||||
@ -125,6 +127,7 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
|
||||
} else if slots.startSlot == slots.endSlot {
|
||||
log.WithField("slot", slots.startSlot).Debug("Added new slot to workCh")
|
||||
workCh <- slots.startSlot
|
||||
processedCh <- slots
|
||||
} else {
|
||||
for i := slots.startSlot; i <= slots.endSlot; i++ {
|
||||
workCh <- i
|
||||
|
@ -237,8 +237,11 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
|
||||
case slots := <-processCh:
|
||||
// Make sure the start and end slot exist in the slots table.
|
||||
go func() {
|
||||
finishedProcess := false
|
||||
for !finishedProcess {
|
||||
log.WithFields(log.Fields{
|
||||
"startSlot": slots.startSlot,
|
||||
"endSlot": slots.endSlot,
|
||||
}).Debug("Starting to check to see if the following slots have been processed")
|
||||
for {
|
||||
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
@ -248,8 +251,9 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
|
||||
errCh <- err
|
||||
}
|
||||
if isStartProcess && isEndProcess {
|
||||
finishedProcess = true
|
||||
break
|
||||
}
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
}
|
||||
|
||||
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
|
||||
|
Loading…
Reference in New Issue
Block a user