Feature/44 read write historic slots #46

Merged
abdulrabbani00 merged 10 commits from feature/44-read-write-historic-slots into develop 2022-05-24 20:18:55 +00:00
3 changed files with 23 additions and 13 deletions
Showing only changes of commit 48d7b0a416 - Show all commits

View File

@ -206,4 +206,5 @@ jobs:
- name: golangci-lint - name: golangci-lint
uses: golangci/golangci-lint-action@v3 uses: golangci/golangci-lint-action@v3
with: with:
args: --timeout 90s --disable deadcode args: --timeout 90s --disable deadcode,
# args: --timeout 90s --disable deadcode,unused

View File

@ -18,6 +18,8 @@
package beaconclient package beaconclient
import ( import (
"fmt"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -85,10 +87,24 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser
// Process all ranges and send each individual slot to the worker. // Process all ranges and send each individual slot to the worker.
go func() { go func() {
for slots := range slotsCh { for slots := range slotsCh {
for i := slots.startSlot; i <= slots.endSlot; i++ { if slots.startSlot > slots.endSlot {
workCh <- i log.Error("We received a batch process request where the startSlot is greater than the end slot.")
errCh <- batchHistoricError{
err: fmt.Errorf("We received a startSlot where the start was greater than the end."),
errProcess: "RangeOrder",
slot: slots.startSlot,
}
errCh <- batchHistoricError{
err: fmt.Errorf("We received a endSlot where the start was greater than the end."),
errProcess: "RangeOrder",
slot: slots.endSlot,
}
} else {
for i := slots.startSlot; i <= slots.endSlot; i++ {
workCh <- i
}
processedCh <- slots
} }
processedCh <- slots
} }
}() }()

View File

@ -45,7 +45,6 @@ var (
return fmt.Sprintf("Unable to properly unmarshal the Slot field in the %s.", obj) return fmt.Sprintf("Unable to properly unmarshal the Slot field in the %s.", obj)
} }
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock." ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
MissingIdentifiedError = "Can't query state without a set slot or block_root"
MissingEth1Data = "Can't get the Eth1 block_hash" MissingEth1Data = "Can't get the Eth1 block_hash"
VersionedUnmarshalerError = "Unable to create a versioned unmarshaler" VersionedUnmarshalerError = "Unable to create a versioned unmarshaler"
) )
@ -160,11 +159,8 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d
var blockIdentifier string // Used to query the block var blockIdentifier string // Used to query the block
if ps.BlockRoot != "" { if ps.BlockRoot != "" {
blockIdentifier = ps.BlockRoot blockIdentifier = ps.BlockRoot
} else if ps.Slot != 0 {
blockIdentifier = strconv.Itoa(ps.Slot)
} else { } else {
log.Error(MissingIdentifiedError) blockIdentifier = strconv.Itoa(ps.Slot)
return fmt.Errorf(MissingIdentifiedError)
} }
blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier
var err error var err error
@ -209,11 +205,8 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
var stateIdentifier string // Used to query the state var stateIdentifier string // Used to query the state
if ps.StateRoot != "" { if ps.StateRoot != "" {
stateIdentifier = ps.StateRoot stateIdentifier = ps.StateRoot
} else if ps.Slot != 0 {
stateIdentifier = strconv.Itoa(ps.Slot)
} else { } else {
log.Error(MissingIdentifiedError) stateIdentifier = strconv.Itoa(ps.Slot)
return fmt.Errorf(MissingIdentifiedError)
} }
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))