115 lines
3.0 KiB
Go
115 lines
3.0 KiB
Go
|
package blobscan
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/hex"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
log "log/slog"
|
||
|
"net/http"
|
||
|
|
||
|
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// Max number of objects returned per API call (same for blobs & blocks)
|
||
|
BlobscanMaxResults = 25
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
BlobscanBlocksRangePath = "/blocks?sort=asc&startBlock=%d&endBlock=%d"
|
||
|
BlobscanBlobsPath = "/blobs/%s"
|
||
|
)
|
||
|
|
||
|
type BlobscanScraper struct {
|
||
|
URL string
|
||
|
BlockStep uint64
|
||
|
}
|
||
|
|
||
|
func (bs *BlobscanScraper) Endpoint(path string, args ...any) string {
|
||
|
return fmt.Sprintf(bs.URL+path, args...)
|
||
|
}
|
||
|
|
||
|
// ApiBlocksRange returns the API endpoint for fetching a (half-open) range of blocks
|
||
|
func (bs *BlobscanScraper) ApiBlocksRange(from, to uint64) string {
|
||
|
return bs.Endpoint(BlobscanBlocksRangePath, from, to)
|
||
|
}
|
||
|
|
||
|
func (bs *BlobscanScraper) ScrapeBlobs(ctx context.Context, from, to uint64, db storage.KvStorage) error {
|
||
|
for num := from; to == 0 || num <= to; num += bs.BlockStep {
|
||
|
if ctx.Err() != nil {
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
end := min(to, num+bs.BlockStep)
|
||
|
endpoint := bs.ApiBlocksRange(num, end)
|
||
|
log.Debug("Fetching block", "endpoint", endpoint)
|
||
|
blocks, err := fetchBlocks(ctx, endpoint)
|
||
|
if err != nil {
|
||
|
log.Error("Failed to fetch blocks", "block", num, "error", err)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, block := range blocks {
|
||
|
log.Debug("Processing block", "block", block.Number)
|
||
|
for _, tx := range block.Transactions {
|
||
|
for _, txblob := range tx.Blobs {
|
||
|
endpoint := bs.Endpoint(BlobscanBlobsPath, txblob.VersionedHash.String())
|
||
|
log.Debug("Fetching blob", "endpoint", endpoint)
|
||
|
blob, err := fetchBlob(ctx, endpoint)
|
||
|
if err != nil {
|
||
|
log.Error("Failed to fetch blob", "error", err, "endpoint", endpoint)
|
||
|
return err
|
||
|
}
|
||
|
data, err := hex.DecodeString(blob.Data[2:])
|
||
|
if err != nil {
|
||
|
log.Error("Error decoding blob", "error", err)
|
||
|
return err
|
||
|
}
|
||
|
if err := pushBlob(ctx, blob.VersionedHash, data, db); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func httpGetJson[T any](ctx context.Context, endpoint string, ret *T) error {
|
||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
resp, err := http.DefaultClient.Do(req)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if resp.StatusCode != http.StatusOK {
|
||
|
return fmt.Errorf("HTTP %d", resp.StatusCode)
|
||
|
}
|
||
|
defer resp.Body.Close()
|
||
|
|
||
|
return json.NewDecoder(resp.Body).Decode(ret)
|
||
|
}
|
||
|
|
||
|
func fetchBlocks(ctx context.Context, endpoint string) ([]BlobscanBlock, error) {
|
||
|
var ret struct {
|
||
|
Blobs []BlobscanBlock `json:"blocks"`
|
||
|
}
|
||
|
return ret.Blobs, httpGetJson(ctx, endpoint, &ret)
|
||
|
}
|
||
|
|
||
|
func fetchBlob(ctx context.Context, endpoint string) (*BlobscanBlob, error) {
|
||
|
var ret BlobscanBlob
|
||
|
return &ret, httpGetJson(ctx, endpoint, &ret)
|
||
|
}
|
||
|
|
||
|
func pushBlob(ctx context.Context, vhash VersionedHash, data []byte, db storage.KvStorage) error {
|
||
|
log.Info("Storing blob", "vhash", vhash)
|
||
|
err := db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0)
|
||
|
if err != nil {
|
||
|
log.Error("Error storing blob", "error", err)
|
||
|
}
|
||
|
return err
|
||
|
}
|