Implement Blobscan scraper #2

Merged
roysc merged 4 commits from scrape-blobscan into main 2024-07-01 14:01:28 +00:00
14 changed files with 375 additions and 86 deletions

View File

@ -13,8 +13,8 @@ env:
SYSTEM_TESTS_REF: roysc/test-blob-tx SYSTEM_TESTS_REF: roysc/test-blob-tx
jobs: jobs:
test: test-beacon-collector:
name: Run integration tests name: Run Beacon collector tests
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
@ -62,3 +62,36 @@ jobs:
pip3 install --no-deps 'eth-account>=0.12.3' pip3 install --no-deps 'eth-account>=0.12.3'
pip3 install 'pydantic>=2.0.0' pip3 install 'pydantic>=2.0.0'
python3 -m pytest -vv -k test_blob_tx python3 -m pytest -vv -k test_blob_tx
test-blobscan-scraper:
name: Run Blobscan scraper tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version-file: 'go.mod'
check-latest: true
- name: "Install redis-cli"
env:
DEBIAN_FRONTEND: noninteractive
run: apt-get update && apt-get install -y redis-tools
- name: "Run Blob DB"
working-directory: ./test
run: docker compose up eth-blob-db --wait --quiet-pull
# Check a mainnet blob
- name: "Run tests"
env:
TEST_BLOCK: 20000000
TEST_VHASH: 0x017ba4bd9c166498865a3d08618e333ee84812941b5c3a356971b4a6ffffa574
# First 10 bytes of blob data
EXPECTED_VALUE: 2100000675789c8cd37b0a
run: |
REDIS_PORT=$(docker port test-eth-blob-db-1 6379 | cut -d: -f2)
go run ./cmd/blobscan-scraper \
--redis-addr localhost:$REDIS_PORT --log-level debug \
--from-block $TEST_BLOCK --to-block $((TEST_BLOCK + 1))
VALUE=$(redis-cli -p $REDIS_PORT GETRANGE blob:$TEST_VHASH 0 9 | od -An -tx1 | tr -d ' \n')
[ "$EXPECTED_VALUE" = "$VALUE" ]

View File

@ -5,11 +5,11 @@ WORKDIR /eth-blob-indexer
COPY go.mod go.sum ./ COPY go.mod go.sum ./
RUN go mod download RUN go mod download
COPY . . COPY . .
RUN go build -o eth-blob-indexer . RUN go build -o beacon-blob-collector ./cmd/beacon-blob-collector
FROM alpine FROM alpine
WORKDIR /app WORKDIR /app
COPY --from=builder /eth-blob-indexer/eth-blob-indexer . COPY --from=builder /eth-blob-indexer/beacon-blob-collector .
ENTRYPOINT ["/app/eth-blob-indexer"] ENTRYPOINT ["/app/beacon-blob-collector"]

View File

@ -1,10 +1,19 @@
# eth-blob-indexer # eth-blob-indexer
A service that indexes all newly committed blobs on the Ethereum Beacon chain. Tools to index Ethereum blob data.
Example: ## beacon-blob-collector
Service that indexes all newly committed blobs on the Ethereum Beacon chain.
Build:
``` ```
$ eth-blob-indexer \ $ go build ./cmd/beacon-blob-collector
```
Run:
```
$ beacon-blob-collector \
--beacon-addr=http://fixturenet-eth-lighthouse-1:8001 \ --beacon-addr=http://fixturenet-eth-lighthouse-1:8001 \
--redis-addr=eth-blob-db:6379 \ --redis-addr=eth-blob-db:6379 \
--log-level=debug --log-level=debug
@ -19,3 +28,29 @@ May 29 17:16:16.098 DBG Received head event slot=240 block=0x304d37b405fb99913a9
May 29 17:16:16.098 DBG Fetching blobs endpoint=http://fixturenet-eth-lighthouse-1:8001/eth/v1/beacon/blob_sidecars/240 May 29 17:16:16.098 DBG Fetching blobs endpoint=http://fixturenet-eth-lighthouse-1:8001/eth/v1/beacon/blob_sidecars/240
May 29 17:16:16.107 INF Storing blob vhash=0x01265315cda2c869191467c60f7374dc80be8be32b8117cd780655014db819f6 May 29 17:16:16.107 INF Storing blob vhash=0x01265315cda2c869191467c60f7374dc80be8be32b8117cd780655014db819f6
``` ```
## blobscan-scraper
Service that scrapes the Blobscan API for ranges of past blobs. Useful to backfill data no longer
available from a Beacon node (assuming Blobscan has retained the data).
Build:
```
$ go build ./cmd/blobscan-scraper
```
Run:
```
$ blobscan-scraper --redis-addr localhost:57889 --log-level debug --block-start 20032451
Jun 7 18:11:01.619 INF Starting indexer blobscan=https://api.blobscan.com redis=localhost:57889
Jun 7 18:11:01.619 DBG Fetching block endpoint="https://api.blobscan.com/blocks?sort=asc&startBlock=20032451&endBlock=20032476"
Jun 7 18:11:03.115 DBG Processing block block=20032451
Jun 7 18:11:03.115 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x0126e8b373caaba7556c5f410efc445c64235ef048354467a3aaf97588ca8490
Jun 7 18:11:04.665 INF Storing blob vhash=0x0126e8b373caaba7556c5f410efc445c64235ef048354467a3aaf97588ca8490
Jun 7 18:11:04.721 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x01b57cd55c617e40af61a916cbafbd596646c70f69d4685383dfd67966aeadb3
Jun 7 18:11:05.726 INF Storing blob vhash=0x01b57cd55c617e40af61a916cbafbd596646c70f69d4685383dfd67966aeadb3
Jun 7 18:11:05.750 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x01c7ffd0638a0d4b1774668813b7d8740de5ccfaaaac83c333f63b8a7ffa695e
Jun 7 18:11:06.663 INF Storing blob vhash=0x01c7ffd0638a0d4b1774668813b7d8740de5ccfaaaac83c333f63b8a7ffa695e
Jun 7 18:11:06.671 DBG Processing block block=20032453
Jun 7 18:11:06.671 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x01badc50474c8e8353e4becf87147f14288198354948ef13a4b461f28624e87a
```

View File

@ -0,0 +1,35 @@
package main
import (
"context"
"log/slog"
"github.com/cerc-io/eth-blob-indexer/cmd"
"github.com/cerc-io/eth-blob-indexer/pkg/beacon"
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
"github.com/go-redis/redis/v8"
)
const (
defaultBeaconAddr = "https://localhost:5052"
)
func main() {
var BeaconAddress string
cmd.Flags.StringVar(&BeaconAddress, "beacon-addr", defaultBeaconAddr, "Address of the beacon node")
cmd.ParseFlags()
indexer := &beacon.BeaconClient{URL: BeaconAddress}
rdb := redis.NewClient(&redis.Options{
Addr: cmd.RedisAddress,
})
db := storage.NewRedisStorage(rdb)
slog.Info("Starting indexer", "beacon", BeaconAddress, "redis", cmd.RedisAddress)
err := indexer.CollectBlobs(context.Background(), db)
if err != nil {
panic(err)
}
}

View File

@ -0,0 +1,49 @@
package main
import (
"context"
"log/slog"
"github.com/cerc-io/eth-blob-indexer/cmd"
"github.com/cerc-io/eth-blob-indexer/pkg/blobscan"
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
"github.com/go-redis/redis/v8"
)
const (
defaultBlobscanAddr = "https://api.blobscan.com"
)
func main() {
var BlobscanAddress string
var FromBlock, ToBlock uint64
var BlockStep uint64
cmd.Flags.StringVar(&BlobscanAddress,
"blobscan-addr", defaultBlobscanAddr, "Address of the Blobscan API")
cmd.Flags.Uint64Var(&FromBlock,
"from-block", 0, "EL block number to start scanning from")
cmd.Flags.Uint64Var(&ToBlock,
"to-block", 0, "EL block number to scan up to (0 for no limit))")
cmd.Flags.Uint64Var(&BlockStep,
"block-step", blobscan.BlobscanMaxResults, "Number of blocks to fetch per API call")
cmd.ParseFlags()
if BlockStep > blobscan.BlobscanMaxResults {
slog.Warn("Block step exceeds max results supported by API",
"block-step", BlockStep, "max", blobscan.BlobscanMaxResults)
}
indexer := &blobscan.BlobscanScraper{URL: BlobscanAddress, BlockStep: BlockStep}
rdb := redis.NewClient(&redis.Options{
Addr: cmd.RedisAddress,
})
db := storage.NewRedisStorage(rdb)
slog.Info("Starting indexer", "blobscan", BlobscanAddress, "redis", cmd.RedisAddress)
err := indexer.ScrapeBlobs(context.Background(), FromBlock, ToBlock, db)
if err != nil {
panic(err)
}
}

50
cmd/flags.go Normal file
View File

@ -0,0 +1,50 @@
package cmd
import (
"flag"
"log/slog"
"os"
"github.com/lmittmann/tint"
)
const (
defaultRedisAddr = "localhost:6379"
defaultRedisPassword = ""
)
var (
// Flags shared among CLI entrypoints
Flags *flag.FlagSet
RedisAddress string
RedisPassword string
LogLevel flagLevel
logLevelVar slog.LevelVar
)
func init() {
h := tint.NewHandler(os.Stderr, &tint.Options{Level: &logLevelVar})
slog.SetDefault(slog.New(h))
Flags = flag.NewFlagSet("blob-indexer", flag.ExitOnError)
Flags.StringVar(&RedisAddress, "redis-addr", defaultRedisAddr, "Address of the Redis server")
Flags.StringVar(&RedisPassword, "redis-password", defaultRedisPassword, "Password for the Redis server")
Flags.Var(&LogLevel, "log-level", "Log level (default INFO)")
}
type flagLevel struct{ slog.Level }
func (fl *flagLevel) Set(value string) error {
return fl.UnmarshalText([]byte(value))
}
func ParseFlags() {
err := Flags.Parse(os.Args[1:])
if err != nil {
panic(err)
}
logLevelVar.Set(LogLevel.Level)
}

69
main.go
View File

@ -1,69 +0,0 @@
package main
import (
"context"
"flag"
"log/slog"
"os"
indexer "github.com/cerc-io/eth-blob-indexer/pkg"
"github.com/go-redis/redis/v8"
"github.com/lmittmann/tint"
)
const (
defaultBeaconAddr = "https://localhost:5052"
defaultRedisAddr = "localhost:6379"
defaultRedisPassword = ""
)
var (
logLevelVar slog.LevelVar
)
func init() {
h := tint.NewHandler(os.Stderr, &tint.Options{Level: &logLevelVar})
slog.SetDefault(slog.New(h))
}
func main() {
// Parse command line arguments
var beaconAddress, redisAddress string
var redisPassword string
var logLevel flagLevel
var flags = flag.NewFlagSet("blob-indexer", flag.ExitOnError)
flags.StringVar(&beaconAddress, "beacon-addr", defaultBeaconAddr, "Address of the beacon node")
flags.StringVar(&redisAddress, "redis-addr", defaultRedisAddr, "Address of the Redis server")
flags.StringVar(&redisPassword, "redis-password", defaultRedisPassword, "Password for the Redis server")
flags.Var(&logLevel, "log-level", "Log level (default INFO)")
err := flags.Parse(os.Args[1:])
if err != nil {
panic(err)
}
logLevelVar.Set(logLevel.Level)
beacon := &indexer.BeaconClient{URL: beaconAddress}
rdb := redis.NewClient(&redis.Options{
Addr: redisAddress,
})
db := indexer.NewRedisStorage(rdb)
// db := indexer.NewMapStorage()
slog.Info("Starting indexer", "beacon", beaconAddress, "redis", redisAddress)
err = beacon.CollectBlobs(context.Background(), db)
if err != nil {
panic(err)
}
}
type flagLevel struct{ slog.Level }
func (fl *flagLevel) Set(value string) error {
return fl.UnmarshalText([]byte(value))
}

View File

@ -1,4 +1,4 @@
package blob_indexer package beacon
import ( import (
"context" "context"

View File

@ -1,9 +1,10 @@
package blob_indexer package beacon
import "github.com/protolambda/zrnt/eth2/beacon/common" import "github.com/protolambda/zrnt/eth2/beacon/common"
type VersionedHash = common.Bytes32 type VersionedHash = common.Bytes32
// HeadEvent represents a "head" event from the Blobscan API
type HeadEvent struct { type HeadEvent struct {
Slot common.Slot `json:"slot"` Slot common.Slot `json:"slot"`
Block common.Root `json:"block"` Block common.Root `json:"block"`
@ -14,6 +15,7 @@ type HeadEvent struct {
ExecutionOptimistic bool `json:"execution_optimistic"` ExecutionOptimistic bool `json:"execution_optimistic"`
} }
// BlobSidecarEvent represents a "blob_sidecar" event from the Blobscan API
type BlobSidecarEvent struct { type BlobSidecarEvent struct {
BlockRoot common.Root `json:"block_root"` BlockRoot common.Root `json:"block_root"`
Index string `json:"index"` Index string `json:"index"`
@ -22,6 +24,7 @@ type BlobSidecarEvent struct {
VersionedHash VersionedHash `json:"versioned_hash"` VersionedHash VersionedHash `json:"versioned_hash"`
} }
// BlobSidecarsResponse represents a response from the Blobscan API blob_sidecars endpoint
type BlobSidecarsResponse struct { type BlobSidecarsResponse struct {
Data []BlobSidecar `json:"data"` Data []BlobSidecar `json:"data"`
} }

View File

@ -1,4 +1,4 @@
package blob_indexer package beacon
import ( import (
"context" "context"
@ -9,6 +9,7 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
"github.com/protolambda/zrnt/eth2/beacon/common" "github.com/protolambda/zrnt/eth2/beacon/common"
) )
@ -18,6 +19,7 @@ var (
BeaconBlobSidecarsPath = "/eth/v1/beacon/blob_sidecars/%s" BeaconBlobSidecarsPath = "/eth/v1/beacon/blob_sidecars/%s"
) )
// BeaconClient subscribes to events from a beacon node and fetches newly available blobs.
type BeaconClient struct { type BeaconClient struct {
URL string URL string
} }
@ -26,7 +28,7 @@ func (bc *BeaconClient) Endpoint(path string, args ...any) string {
return fmt.Sprintf(bc.URL+path, args...) return fmt.Sprintf(bc.URL+path, args...)
} }
func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) error { func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db storage.KvStorage) error {
events := make(chan *Event) events := make(chan *Event)
subBlobs := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "blob_sidecar"), events) subBlobs := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "blob_sidecar"), events)
@ -43,6 +45,7 @@ func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) erro
// blob_sidecar events are received after validation over gossip, and the blobs they reference // blob_sidecar events are received after validation over gossip, and the blobs they reference
// will not be available until head is updated with the corresponding block. // will not be available until head is updated with the corresponding block.
// Reorgs should not cause an issue here as we aren't concerned with canonicity.
timeout := 5 * time.Minute timeout := 5 * time.Minute
for { for {
select { select {
@ -111,12 +114,12 @@ func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) erro
} }
case <-ctx.Done(): case <-ctx.Done():
log.Info("Context cancelled, exiting") log.Info("Context cancelled, exiting")
return nil return ctx.Err()
} }
} }
} }
func pushBlob(ctx context.Context, blob *BlobSidecar, db KvStorage) error { func pushBlob(ctx context.Context, blob *BlobSidecar, db storage.KvStorage) error {
vhash := blob.KzgCommitment.ToVersionedHash() vhash := blob.KzgCommitment.ToVersionedHash()
data, err := hex.DecodeString(blob.Blob[2:]) data, err := hex.DecodeString(blob.Blob[2:])
if err != nil { if err != nil {
@ -125,7 +128,7 @@ func pushBlob(ctx context.Context, blob *BlobSidecar, db KvStorage) error {
} }
log.Info("Storing blob", "vhash", vhash) log.Info("Storing blob", "vhash", vhash)
err = db.Set(ctx, BlobKeyFromHash(vhash), data, 0) err = db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0)
if err != nil { if err != nil {
log.Error("Error storing blob", "error", err) log.Error("Error storing blob", "error", err)
return err return err

33
pkg/blobscan/models.go Normal file
View File

@ -0,0 +1,33 @@
package blobscan
import "github.com/protolambda/zrnt/eth2/beacon/common"
type VersionedHash = common.Bytes32
// BlobscanBlock represents an EL block from the Blobscan API
type BlobscanBlock struct {
Slot common.Slot `json:"slot"`
Hash common.Root `json:"hash"`
Number uint64 `json:"number"`
Transactions []BlobscanBlockTransaction `json:"transactions"`
Timestamp string `json:"timestamp"`
}
// BlobscanBlockTransaction represents a transaction within a block from the Blobscan API
type BlobscanBlockTransaction struct {
Hash common.Root `json:"hash"`
Blobs []struct {
Index uint64 `json:"index"`
VersionedHash VersionedHash `json:"versionedHash"`
} `json:"blobs"`
Rollup string `json:"rollup"`
}
// BlobscanBlob represents a blob from the Blobscan API
type BlobscanBlob struct {
Commitment common.KZGCommitment `json:"commitment"`
Proof string `json:"proof"`
Size int `json:"size"`
VersionedHash VersionedHash `json:"versionedHash"`
Data string `json:"data"`
}

114
pkg/blobscan/service.go Normal file
View File

@ -0,0 +1,114 @@
package blobscan
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
log "log/slog"
"net/http"
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
)
const (
// Max number of objects returned per API call (same for blobs & blocks)
BlobscanMaxResults = 25
)
var (
BlobscanBlocksRangePath = "/blocks?sort=asc&startBlock=%d&endBlock=%d"
BlobscanBlobsPath = "/blobs/%s"
)
type BlobscanScraper struct {
URL string
BlockStep uint64
}
func (bs *BlobscanScraper) Endpoint(path string, args ...any) string {
return fmt.Sprintf(bs.URL+path, args...)
}
// ApiBlocksRange returns the API endpoint for fetching a (half-open) range of blocks
func (bs *BlobscanScraper) ApiBlocksRange(from, to uint64) string {
return bs.Endpoint(BlobscanBlocksRangePath, from, to)
}
func (bs *BlobscanScraper) ScrapeBlobs(ctx context.Context, from, to uint64, db storage.KvStorage) error {
for num := from; to == 0 || num <= to; num += bs.BlockStep {
if ctx.Err() != nil {
return ctx.Err()
}
end := min(to, num+bs.BlockStep)
endpoint := bs.ApiBlocksRange(num, end)
log.Debug("Fetching block", "endpoint", endpoint)
blocks, err := fetchBlocks(ctx, endpoint)
if err != nil {
log.Error("Failed to fetch blocks", "block", num, "error", err)
return err
}
for _, block := range blocks {
log.Debug("Processing block", "block", block.Number)
for _, tx := range block.Transactions {
for _, txblob := range tx.Blobs {
endpoint := bs.Endpoint(BlobscanBlobsPath, txblob.VersionedHash.String())
log.Debug("Fetching blob", "endpoint", endpoint)
blob, err := fetchBlob(ctx, endpoint)
if err != nil {
log.Error("Failed to fetch blob", "error", err, "endpoint", endpoint)
return err
}
data, err := hex.DecodeString(blob.Data[2:])
if err != nil {
log.Error("Error decoding blob", "error", err)
return err
}
if err := pushBlob(ctx, blob.VersionedHash, data, db); err != nil {
return err
}
}
}
}
}
return nil
}
func httpGetJson[T any](ctx context.Context, endpoint string, ret *T) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP %d", resp.StatusCode)
}
defer resp.Body.Close()
return json.NewDecoder(resp.Body).Decode(ret)
}
func fetchBlocks(ctx context.Context, endpoint string) ([]BlobscanBlock, error) {
var ret struct {
Blobs []BlobscanBlock `json:"blocks"`
}
return ret.Blobs, httpGetJson(ctx, endpoint, &ret)
}
func fetchBlob(ctx context.Context, endpoint string) (*BlobscanBlob, error) {
var ret BlobscanBlob
return &ret, httpGetJson(ctx, endpoint, &ret)
}
func pushBlob(ctx context.Context, vhash VersionedHash, data []byte, db storage.KvStorage) error {
log.Info("Storing blob", "vhash", vhash)
err := db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0)
if err != nil {
log.Error("Error storing blob", "error", err)
}
return err
}

View File

@ -1,4 +1,4 @@
package blob_indexer package storage
import ( import (
"context" "context"

View File

@ -1,6 +1,9 @@
services: services:
eth-blob-indexer: eth-blob-indexer:
image: cerc/eth-blob-indexer:local image: cerc/eth-blob-indexer:local
build:
context: ..
dockerfile: Dockerfile
restart: on-failure restart: on-failure
depends_on: depends_on:
eth-blob-db: eth-blob-db:
@ -16,7 +19,7 @@ services:
ports: ports:
- 6379 - 6379
healthcheck: healthcheck:
test: [ "CMD", "redis-cli", "--raw", "incr", "ping" ] test: [ "CMD", "redis-cli", "--raw", "incr", "_ping" ]
networks: networks:
test_default: test_default: