eth-blob-indexer/pkg/beacon/service.go

152 lines
4.0 KiB
Go

package beacon
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
log "log/slog"
"net/http"
"time"
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
"github.com/protolambda/zrnt/eth2/beacon/common"
)
var (
// BeaconNodeHealthPath = "/eth/v1/node/health"
BeaconEventsPath = "/eth/v1/events?topics=%s"
BeaconBlobSidecarsPath = "/eth/v1/beacon/blob_sidecars/%s"
)
// BeaconClient subscribes to events from a beacon node and fetches newly available blobs.
type BeaconClient struct {
URL string
}
func (bc *BeaconClient) Endpoint(path string, args ...any) string {
return fmt.Sprintf(bc.URL+path, args...)
}
func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db storage.KvStorage) error {
events := make(chan *Event)
subBlobs := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "blob_sidecar"), events)
defer subBlobs.Close()
subHead := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "head"), events)
defer subHead.Close()
// Last seen head slot
// TODO: initialize with current head
var lastHead common.Slot
// Blobs we know about but haven't fetched yet
// TOOD: persist and load?
unfetched := map[common.Slot]int{}
// TODO: reorgs?
// blob_sidecar events are received after validation over gossip, and the blobs they reference
// will not be available until head is updated with the corresponding block.
timeout := 5 * time.Minute
for {
select {
case event := <-events:
switch string(event.Event) {
case "head":
var headEvent HeadEvent
if err := json.Unmarshal(event.Data, &headEvent); err != nil {
log.Error("Error unmarshalling event data", "error", err)
return err
}
log.Debug("Received head event", "slot", headEvent.Slot, "block", headEvent.Block)
if headEvent.Slot <= lastHead {
log.Error("Unexpected head slot", "slot", headEvent.Slot)
}
lastHead = headEvent.Slot
case "blob_sidecar":
var blobEvent BlobSidecarEvent
if err := json.Unmarshal(event.Data, &blobEvent); err != nil {
log.Error("Error unmarshalling event data", "error", err)
return err
}
log.Debug("Received blob_sidecar event",
"slot", blobEvent.Slot,
"block", blobEvent.BlockRoot,
"vhash", blobEvent.VersionedHash,
)
unfetched[blobEvent.Slot]++
}
var fetched []common.Slot
for slot := range unfetched {
if slot <= lastHead {
resp, err := fetchBlobs(beacon, slot.String())
if err != nil {
log.Error("Error fetching blobs", "error", err)
return err
}
if len(resp.Data) == 0 {
log.Error("No blobs in block", "slot", slot)
}
for _, blob := range resp.Data {
if err := pushBlob(ctx, &blob, db); err != nil {
return err
}
unfetched[slot]--
}
}
if unfetched[slot] == 0 {
fetched = append(fetched, slot)
}
}
for _, slot := range fetched {
delete(unfetched, slot)
}
case <-time.After(timeout):
log.Debug("No events received", "for", timeout)
case err := <-subBlobs.Err():
if err != nil {
return err
}
case err := <-subHead.Err():
if err != nil {
return err
}
case <-ctx.Done():
log.Info("Context cancelled, exiting")
return ctx.Err()
}
}
}
func pushBlob(ctx context.Context, blob *BlobSidecar, db storage.KvStorage) error {
vhash := blob.KzgCommitment.ToVersionedHash()
data, err := hex.DecodeString(blob.Blob[2:])
if err != nil {
log.Error("Error decoding blob", "error", err)
return err
}
log.Info("Storing blob", "vhash", vhash)
err = db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0)
if err != nil {
log.Error("Error storing blob", "error", err)
return err
}
return nil
}
func fetchBlobs(beacon *BeaconClient, blockId string) (*BlobSidecarsResponse, error) {
endpoint := beacon.Endpoint(BeaconBlobSidecarsPath, blockId)
log.Debug("Fetching blobs", "endpoint", endpoint)
resp, err := http.Get(endpoint)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var obj BlobSidecarsResponse
decoder := json.NewDecoder(resp.Body)
return &obj, decoder.Decode(&obj)
}