package beacon import ( "context" "encoding/hex" "encoding/json" "fmt" log "log/slog" "net/http" "time" "github.com/cerc-io/eth-blob-indexer/pkg/storage" "github.com/protolambda/zrnt/eth2/beacon/common" ) var ( // BeaconNodeHealthPath = "/eth/v1/node/health" BeaconEventsPath = "/eth/v1/events?topics=%s" BeaconBlobSidecarsPath = "/eth/v1/beacon/blob_sidecars/%s" ) // BeaconClient subscribes to events from a beacon node and fetches newly available blobs. type BeaconClient struct { URL string } func (bc *BeaconClient) Endpoint(path string, args ...any) string { return fmt.Sprintf(bc.URL+path, args...) } func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db storage.KvStorage) error { events := make(chan *Event) subBlobs := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "blob_sidecar"), events) defer subBlobs.Close() subHead := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "head"), events) defer subHead.Close() // Last seen head slot // TODO: initialize with current head var lastHead common.Slot // Blobs we know about but haven't fetched yet // TOOD: persist and load? unfetched := map[common.Slot]int{} // blob_sidecar events are received after validation over gossip, and the blobs they reference // will not be available until head is updated with the corresponding block. // Reorgs should not cause an issue here as we aren't concerned with canonicity. timeout := 5 * time.Minute for { select { case event := <-events: switch string(event.Event) { case "head": var headEvent HeadEvent if err := json.Unmarshal(event.Data, &headEvent); err != nil { log.Error("Error unmarshalling event data", "error", err) return err } log.Debug("Received head event", "slot", headEvent.Slot, "block", headEvent.Block) if headEvent.Slot <= lastHead { log.Error("Unexpected head slot", "slot", headEvent.Slot) } lastHead = headEvent.Slot case "blob_sidecar": var blobEvent BlobSidecarEvent if err := json.Unmarshal(event.Data, &blobEvent); err != nil { log.Error("Error unmarshalling event data", "error", err) return err } log.Debug("Received blob_sidecar event", "slot", blobEvent.Slot, "block", blobEvent.BlockRoot, "vhash", blobEvent.VersionedHash, ) unfetched[blobEvent.Slot]++ } var fetched []common.Slot for slot := range unfetched { if slot <= lastHead { resp, err := fetchBlobs(beacon, slot.String()) if err != nil { log.Error("Error fetching blobs", "error", err) return err } if len(resp.Data) == 0 { log.Error("No blobs in block", "slot", slot) } for _, blob := range resp.Data { if err := pushBlob(ctx, &blob, db); err != nil { return err } unfetched[slot]-- } } if unfetched[slot] == 0 { fetched = append(fetched, slot) } } for _, slot := range fetched { delete(unfetched, slot) } case <-time.After(timeout): log.Debug("No events received", "for", timeout) case err := <-subBlobs.Err(): if err != nil { return err } case err := <-subHead.Err(): if err != nil { return err } case <-ctx.Done(): log.Info("Context cancelled, exiting") return ctx.Err() } } } func pushBlob(ctx context.Context, blob *BlobSidecar, db storage.KvStorage) error { vhash := blob.KzgCommitment.ToVersionedHash() data, err := hex.DecodeString(blob.Blob[2:]) if err != nil { log.Error("Error decoding blob", "error", err) return err } log.Info("Storing blob", "vhash", vhash) err = db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0) if err != nil { log.Error("Error storing blob", "error", err) return err } return nil } func fetchBlobs(beacon *BeaconClient, blockId string) (*BlobSidecarsResponse, error) { endpoint := beacon.Endpoint(BeaconBlobSidecarsPath, blockId) log.Debug("Fetching blobs", "endpoint", endpoint) resp, err := http.Get(endpoint) if err != nil { return nil, err } defer resp.Body.Close() var obj BlobSidecarsResponse decoder := json.NewDecoder(resp.Body) return &obj, decoder.Decode(&obj) }