Use a transaction for writing, knowngaps, and reorgs #53
@ -16,7 +16,7 @@ import (
|
|||||||
var _ = Describe("Capturehistoric", func() {
|
var _ = Describe("Capturehistoric", func() {
|
||||||
|
|
||||||
Describe("Run the application in historic mode", Label("unit", "behavioral"), func() {
|
Describe("Run the application in historic mode", Label("unit", "behavioral"), func() {
|
||||||
Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", Label("now"), func() {
|
Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", func() {
|
||||||
It("Successfully Process the Block", func() {
|
It("Successfully Process the Block", func() {
|
||||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||||
|
@ -319,9 +319,9 @@ func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot
|
|||||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||||
"proposedCount": proposedCount,
|
"proposedCount": proposedCount,
|
||||||
}).Error("Too many rows were marked as proposed!")
|
}).Error("Too many rows were marked as proposed!")
|
||||||
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
|
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Too many rows were marked as unproposed."), "reorg", metrics)
|
||||||
} else if proposedCount == 0 {
|
} else if proposedCount == 0 {
|
||||||
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
|
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Unable to find properly proposed row in DB"), "reorg", metrics)
|
||||||
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -382,13 +382,19 @@ func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot
|
|||||||
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
|
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
|
||||||
// have 10 entries as follows: 1-10, 11-20, etc...
|
// have 10 entries as follows: 1-10, 11-20, etc...
|
||||||
func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
|
func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
|
||||||
|
var entryErrorMsg string
|
||||||
|
if entryError == nil {
|
||||||
|
entryErrorMsg = ""
|
||||||
|
} else {
|
||||||
|
entryErrorMsg = entryError.Error()
|
||||||
|
}
|
||||||
if endSlot-startSlot <= tableIncrement {
|
if endSlot-startSlot <= tableIncrement {
|
||||||
kgModel := DbKnownGaps{
|
kgModel := DbKnownGaps{
|
||||||
StartSlot: strconv.Itoa(startSlot),
|
StartSlot: strconv.Itoa(startSlot),
|
||||||
EndSlot: strconv.Itoa(endSlot),
|
EndSlot: strconv.Itoa(endSlot),
|
||||||
CheckedOut: false,
|
CheckedOut: false,
|
||||||
ReprocessingError: "",
|
ReprocessingError: "",
|
||||||
EntryError: entryError.Error(),
|
EntryError: entryErrorMsg,
|
||||||
EntryProcess: entryProcess,
|
EntryProcess: entryProcess,
|
||||||
}
|
}
|
||||||
upsertKnownGaps(tx, ctx, kgModel, metric)
|
upsertKnownGaps(tx, ctx, kgModel, metric)
|
||||||
@ -413,7 +419,7 @@ func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, start
|
|||||||
EndSlot: strconv.Itoa(tempEnd),
|
EndSlot: strconv.Itoa(tempEnd),
|
||||||
CheckedOut: false,
|
CheckedOut: false,
|
||||||
ReprocessingError: "",
|
ReprocessingError: "",
|
||||||
EntryError: entryError.Error(),
|
EntryError: entryErrorMsg,
|
||||||
EntryProcess: entryProcess,
|
EntryProcess: entryProcess,
|
||||||
}
|
}
|
||||||
upsertKnownGaps(tx, ctx, kgModel, metric)
|
upsertKnownGaps(tx, ctx, kgModel, metric)
|
||||||
|
@ -98,12 +98,12 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError,
|
|||||||
for slot := range workCh {
|
for slot := range workCh {
|
||||||
log.Debug("Handling slot: ", slot)
|
log.Debug("Handling slot: ", slot)
|
||||||
err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb)
|
err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb)
|
||||||
|
if err != nil {
|
||||||
errMs := batchHistoricError{
|
errMs := batchHistoricError{
|
||||||
err: err,
|
err: err,
|
||||||
errProcess: errProcess,
|
errProcess: errProcess,
|
||||||
slot: slot,
|
slot: slot,
|
||||||
}
|
}
|
||||||
if err != nil {
|
|
||||||
errCh <- errMs
|
errCh <- errMs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user