eth-blob-indexer/pkg/blobscan/service.go
Roy Crihfield 1d1ee73ff2
Some checks failed
Tests / Run Blobscan scraper tests (push) Successful in 2m23s
Tests / Run Beacon collector tests (push) Failing after 22m23s
Implement Blobscan scraper (#2)
New utility to backfill data from Blobscan.

Reviewed-on: #2
2024-07-01 14:01:27 +00:00

115 lines
3.0 KiB
Go

package blobscan
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
log "log/slog"
"net/http"
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
)
const (
// Max number of objects returned per API call (same for blobs & blocks)
BlobscanMaxResults = 25
)
var (
BlobscanBlocksRangePath = "/blocks?sort=asc&startBlock=%d&endBlock=%d"
BlobscanBlobsPath = "/blobs/%s"
)
type BlobscanScraper struct {
URL string
BlockStep uint64
}
func (bs *BlobscanScraper) Endpoint(path string, args ...any) string {
return fmt.Sprintf(bs.URL+path, args...)
}
// ApiBlocksRange returns the API endpoint for fetching a (half-open) range of blocks
func (bs *BlobscanScraper) ApiBlocksRange(from, to uint64) string {
return bs.Endpoint(BlobscanBlocksRangePath, from, to)
}
func (bs *BlobscanScraper) ScrapeBlobs(ctx context.Context, from, to uint64, db storage.KvStorage) error {
for num := from; to == 0 || num <= to; num += bs.BlockStep {
if ctx.Err() != nil {
return ctx.Err()
}
end := min(to, num+bs.BlockStep)
endpoint := bs.ApiBlocksRange(num, end)
log.Debug("Fetching block", "endpoint", endpoint)
blocks, err := fetchBlocks(ctx, endpoint)
if err != nil {
log.Error("Failed to fetch blocks", "block", num, "error", err)
return err
}
for _, block := range blocks {
log.Debug("Processing block", "block", block.Number)
for _, tx := range block.Transactions {
for _, txblob := range tx.Blobs {
endpoint := bs.Endpoint(BlobscanBlobsPath, txblob.VersionedHash.String())
log.Debug("Fetching blob", "endpoint", endpoint)
blob, err := fetchBlob(ctx, endpoint)
if err != nil {
log.Error("Failed to fetch blob", "error", err, "endpoint", endpoint)
return err
}
data, err := hex.DecodeString(blob.Data[2:])
if err != nil {
log.Error("Error decoding blob", "error", err)
return err
}
if err := pushBlob(ctx, blob.VersionedHash, data, db); err != nil {
return err
}
}
}
}
}
return nil
}
func httpGetJson[T any](ctx context.Context, endpoint string, ret *T) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP %d", resp.StatusCode)
}
defer resp.Body.Close()
return json.NewDecoder(resp.Body).Decode(ret)
}
func fetchBlocks(ctx context.Context, endpoint string) ([]BlobscanBlock, error) {
var ret struct {
Blobs []BlobscanBlock `json:"blocks"`
}
return ret.Blobs, httpGetJson(ctx, endpoint, &ret)
}
func fetchBlob(ctx context.Context, endpoint string) (*BlobscanBlob, error) {
var ret BlobscanBlob
return &ret, httpGetJson(ctx, endpoint, &ret)
}
func pushBlob(ctx context.Context, vhash VersionedHash, data []byte, db storage.KvStorage) error {
log.Info("Storing blob", "vhash", vhash)
err := db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0)
if err != nil {
log.Error("Error storing blob", "error", err)
}
return err
}