diff --git a/.gitea/workflows/test.yml b/.gitea/workflows/test.yml index 14ab979..add588c 100644 --- a/.gitea/workflows/test.yml +++ b/.gitea/workflows/test.yml @@ -13,8 +13,8 @@ env: SYSTEM_TESTS_REF: roysc/test-blob-tx jobs: - test: - name: Run integration tests + test-beacon-collector: + name: Run Beacon collector tests runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -62,3 +62,36 @@ jobs: pip3 install --no-deps 'eth-account>=0.12.3' pip3 install 'pydantic>=2.0.0' python3 -m pytest -vv -k test_blob_tx + + test-blobscan-scraper: + name: Run Blobscan scraper tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 + with: + go-version-file: 'go.mod' + check-latest: true + - name: "Install redis-cli" + env: + DEBIAN_FRONTEND: noninteractive + run: apt-get update && apt-get install -y redis-tools + + - name: "Run Blob DB" + working-directory: ./test + run: docker compose up eth-blob-db --wait --quiet-pull + # Check a mainnet blob + - name: "Run tests" + env: + TEST_BLOCK: 20000000 + TEST_VHASH: 0x017ba4bd9c166498865a3d08618e333ee84812941b5c3a356971b4a6ffffa574 + # First 10 bytes of blob data + EXPECTED_VALUE: 2100000675789c8cd37b0a + run: | + REDIS_PORT=$(docker port test-eth-blob-db-1 6379 | cut -d: -f2) + go run ./cmd/blobscan-scraper \ + --redis-addr localhost:$REDIS_PORT --log-level debug \ + --from-block $TEST_BLOCK --to-block $((TEST_BLOCK + 1)) + + VALUE=$(redis-cli -p $REDIS_PORT GETRANGE blob:$TEST_VHASH 0 9 | od -An -tx1 | tr -d ' \n') + [ "$EXPECTED_VALUE" = "$VALUE" ] diff --git a/Dockerfile b/Dockerfile index d12231a..1326015 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,11 +5,11 @@ WORKDIR /eth-blob-indexer COPY go.mod go.sum ./ RUN go mod download COPY . . -RUN go build -o eth-blob-indexer . +RUN go build -o beacon-blob-collector ./cmd/beacon-blob-collector FROM alpine WORKDIR /app -COPY --from=builder /eth-blob-indexer/eth-blob-indexer . +COPY --from=builder /eth-blob-indexer/beacon-blob-collector . -ENTRYPOINT ["/app/eth-blob-indexer"] +ENTRYPOINT ["/app/beacon-blob-collector"] diff --git a/README.md b/README.md index b8b13f8..523e5de 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,19 @@ # eth-blob-indexer -A service that indexes all newly committed blobs on the Ethereum Beacon chain. +Tools to index Ethereum blob data. -Example: +## beacon-blob-collector + +Service that indexes all newly committed blobs on the Ethereum Beacon chain. + +Build: ``` -$ eth-blob-indexer \ +$ go build ./cmd/beacon-blob-collector +``` + +Run: +``` +$ beacon-blob-collector \ --beacon-addr=http://fixturenet-eth-lighthouse-1:8001 \ --redis-addr=eth-blob-db:6379 \ --log-level=debug @@ -19,3 +28,29 @@ May 29 17:16:16.098 DBG Received head event slot=240 block=0x304d37b405fb99913a9 May 29 17:16:16.098 DBG Fetching blobs endpoint=http://fixturenet-eth-lighthouse-1:8001/eth/v1/beacon/blob_sidecars/240 May 29 17:16:16.107 INF Storing blob vhash=0x01265315cda2c869191467c60f7374dc80be8be32b8117cd780655014db819f6 ``` + +## blobscan-scraper + +Service that scrapes the Blobscan API for ranges of past blobs. Useful to backfill data no longer +available from a Beacon node (assuming Blobscan has retained the data). + +Build: +``` +$ go build ./cmd/blobscan-scraper +``` + +Run: +``` +$ blobscan-scraper --redis-addr localhost:57889 --log-level debug --block-start 20032451 +Jun 7 18:11:01.619 INF Starting indexer blobscan=https://api.blobscan.com redis=localhost:57889 +Jun 7 18:11:01.619 DBG Fetching block endpoint="https://api.blobscan.com/blocks?sort=asc&startBlock=20032451&endBlock=20032476" +Jun 7 18:11:03.115 DBG Processing block block=20032451 +Jun 7 18:11:03.115 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x0126e8b373caaba7556c5f410efc445c64235ef048354467a3aaf97588ca8490 +Jun 7 18:11:04.665 INF Storing blob vhash=0x0126e8b373caaba7556c5f410efc445c64235ef048354467a3aaf97588ca8490 +Jun 7 18:11:04.721 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x01b57cd55c617e40af61a916cbafbd596646c70f69d4685383dfd67966aeadb3 +Jun 7 18:11:05.726 INF Storing blob vhash=0x01b57cd55c617e40af61a916cbafbd596646c70f69d4685383dfd67966aeadb3 +Jun 7 18:11:05.750 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x01c7ffd0638a0d4b1774668813b7d8740de5ccfaaaac83c333f63b8a7ffa695e +Jun 7 18:11:06.663 INF Storing blob vhash=0x01c7ffd0638a0d4b1774668813b7d8740de5ccfaaaac83c333f63b8a7ffa695e +Jun 7 18:11:06.671 DBG Processing block block=20032453 +Jun 7 18:11:06.671 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x01badc50474c8e8353e4becf87147f14288198354948ef13a4b461f28624e87a +``` diff --git a/cmd/beacon-blob-collector/main.go b/cmd/beacon-blob-collector/main.go new file mode 100644 index 0000000..c8dad9a --- /dev/null +++ b/cmd/beacon-blob-collector/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "context" + "log/slog" + + "github.com/cerc-io/eth-blob-indexer/cmd" + "github.com/cerc-io/eth-blob-indexer/pkg/beacon" + "github.com/cerc-io/eth-blob-indexer/pkg/storage" + + "github.com/go-redis/redis/v8" +) + +const ( + defaultBeaconAddr = "https://localhost:5052" +) + +func main() { + var BeaconAddress string + cmd.Flags.StringVar(&BeaconAddress, "beacon-addr", defaultBeaconAddr, "Address of the beacon node") + cmd.ParseFlags() + + indexer := &beacon.BeaconClient{URL: BeaconAddress} + + rdb := redis.NewClient(&redis.Options{ + Addr: cmd.RedisAddress, + }) + db := storage.NewRedisStorage(rdb) + + slog.Info("Starting indexer", "beacon", BeaconAddress, "redis", cmd.RedisAddress) + err := indexer.CollectBlobs(context.Background(), db) + if err != nil { + panic(err) + } +} diff --git a/cmd/blobscan-scraper/main.go b/cmd/blobscan-scraper/main.go new file mode 100644 index 0000000..e0f2b33 --- /dev/null +++ b/cmd/blobscan-scraper/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "context" + "log/slog" + + "github.com/cerc-io/eth-blob-indexer/cmd" + "github.com/cerc-io/eth-blob-indexer/pkg/blobscan" + "github.com/cerc-io/eth-blob-indexer/pkg/storage" + + "github.com/go-redis/redis/v8" +) + +const ( + defaultBlobscanAddr = "https://api.blobscan.com" +) + +func main() { + var BlobscanAddress string + var FromBlock, ToBlock uint64 + var BlockStep uint64 + + cmd.Flags.StringVar(&BlobscanAddress, + "blobscan-addr", defaultBlobscanAddr, "Address of the Blobscan API") + cmd.Flags.Uint64Var(&FromBlock, + "from-block", 0, "EL block number to start scanning from") + cmd.Flags.Uint64Var(&ToBlock, + "to-block", 0, "EL block number to scan up to (0 for no limit))") + cmd.Flags.Uint64Var(&BlockStep, + "block-step", blobscan.BlobscanMaxResults, "Number of blocks to fetch per API call") + cmd.ParseFlags() + + if BlockStep > blobscan.BlobscanMaxResults { + slog.Warn("Block step exceeds max results supported by API", + "block-step", BlockStep, "max", blobscan.BlobscanMaxResults) + } + + indexer := &blobscan.BlobscanScraper{URL: BlobscanAddress, BlockStep: BlockStep} + rdb := redis.NewClient(&redis.Options{ + Addr: cmd.RedisAddress, + }) + db := storage.NewRedisStorage(rdb) + + slog.Info("Starting indexer", "blobscan", BlobscanAddress, "redis", cmd.RedisAddress) + err := indexer.ScrapeBlobs(context.Background(), FromBlock, ToBlock, db) + if err != nil { + panic(err) + } +} diff --git a/cmd/flags.go b/cmd/flags.go new file mode 100644 index 0000000..7fcecec --- /dev/null +++ b/cmd/flags.go @@ -0,0 +1,50 @@ +package cmd + +import ( + "flag" + "log/slog" + "os" + + "github.com/lmittmann/tint" +) + +const ( + defaultRedisAddr = "localhost:6379" + defaultRedisPassword = "" +) + +var ( + // Flags shared among CLI entrypoints + Flags *flag.FlagSet + + RedisAddress string + RedisPassword string + LogLevel flagLevel + + logLevelVar slog.LevelVar +) + +func init() { + h := tint.NewHandler(os.Stderr, &tint.Options{Level: &logLevelVar}) + slog.SetDefault(slog.New(h)) + + Flags = flag.NewFlagSet("blob-indexer", flag.ExitOnError) + Flags.StringVar(&RedisAddress, "redis-addr", defaultRedisAddr, "Address of the Redis server") + Flags.StringVar(&RedisPassword, "redis-password", defaultRedisPassword, "Password for the Redis server") + Flags.Var(&LogLevel, "log-level", "Log level (default INFO)") +} + +type flagLevel struct{ slog.Level } + +func (fl *flagLevel) Set(value string) error { + return fl.UnmarshalText([]byte(value)) +} + +func ParseFlags() { + err := Flags.Parse(os.Args[1:]) + if err != nil { + panic(err) + } + + logLevelVar.Set(LogLevel.Level) +} diff --git a/main.go b/main.go deleted file mode 100644 index 49d3059..0000000 --- a/main.go +++ /dev/null @@ -1,69 +0,0 @@ -package main - -import ( - "context" - "flag" - "log/slog" - "os" - - indexer "github.com/cerc-io/eth-blob-indexer/pkg" - - "github.com/go-redis/redis/v8" - "github.com/lmittmann/tint" -) - -const ( - defaultBeaconAddr = "https://localhost:5052" - defaultRedisAddr = "localhost:6379" - defaultRedisPassword = "" -) - -var ( - logLevelVar slog.LevelVar -) - -func init() { - h := tint.NewHandler(os.Stderr, &tint.Options{Level: &logLevelVar}) - slog.SetDefault(slog.New(h)) -} - -func main() { - // Parse command line arguments - var beaconAddress, redisAddress string - var redisPassword string - var logLevel flagLevel - - var flags = flag.NewFlagSet("blob-indexer", flag.ExitOnError) - flags.StringVar(&beaconAddress, "beacon-addr", defaultBeaconAddr, "Address of the beacon node") - flags.StringVar(&redisAddress, "redis-addr", defaultRedisAddr, "Address of the Redis server") - flags.StringVar(&redisPassword, "redis-password", defaultRedisPassword, "Password for the Redis server") - flags.Var(&logLevel, "log-level", "Log level (default INFO)") - - err := flags.Parse(os.Args[1:]) - if err != nil { - panic(err) - } - - logLevelVar.Set(logLevel.Level) - - beacon := &indexer.BeaconClient{URL: beaconAddress} - - rdb := redis.NewClient(&redis.Options{ - Addr: redisAddress, - }) - db := indexer.NewRedisStorage(rdb) - - // db := indexer.NewMapStorage() - - slog.Info("Starting indexer", "beacon", beaconAddress, "redis", redisAddress) - err = beacon.CollectBlobs(context.Background(), db) - if err != nil { - panic(err) - } -} - -type flagLevel struct{ slog.Level } - -func (fl *flagLevel) Set(value string) error { - return fl.UnmarshalText([]byte(value)) -} diff --git a/pkg/events.go b/pkg/beacon/events.go similarity index 98% rename from pkg/events.go rename to pkg/beacon/events.go index 9f3ad39..83ff8d2 100644 --- a/pkg/events.go +++ b/pkg/beacon/events.go @@ -1,4 +1,4 @@ -package blob_indexer +package beacon import ( "context" diff --git a/pkg/models.go b/pkg/beacon/models.go similarity index 85% rename from pkg/models.go rename to pkg/beacon/models.go index 563d2b4..bdc7516 100644 --- a/pkg/models.go +++ b/pkg/beacon/models.go @@ -1,9 +1,10 @@ -package blob_indexer +package beacon import "github.com/protolambda/zrnt/eth2/beacon/common" type VersionedHash = common.Bytes32 +// HeadEvent represents a "head" event from the Blobscan API type HeadEvent struct { Slot common.Slot `json:"slot"` Block common.Root `json:"block"` @@ -14,6 +15,7 @@ type HeadEvent struct { ExecutionOptimistic bool `json:"execution_optimistic"` } +// BlobSidecarEvent represents a "blob_sidecar" event from the Blobscan API type BlobSidecarEvent struct { BlockRoot common.Root `json:"block_root"` Index string `json:"index"` @@ -22,6 +24,7 @@ type BlobSidecarEvent struct { VersionedHash VersionedHash `json:"versioned_hash"` } +// BlobSidecarsResponse represents a response from the Blobscan API blob_sidecars endpoint type BlobSidecarsResponse struct { Data []BlobSidecar `json:"data"` } diff --git a/pkg/service.go b/pkg/beacon/service.go similarity index 89% rename from pkg/service.go rename to pkg/beacon/service.go index 0f6de38..7e44c5f 100644 --- a/pkg/service.go +++ b/pkg/beacon/service.go @@ -1,4 +1,4 @@ -package blob_indexer +package beacon import ( "context" @@ -9,6 +9,7 @@ import ( "net/http" "time" + "github.com/cerc-io/eth-blob-indexer/pkg/storage" "github.com/protolambda/zrnt/eth2/beacon/common" ) @@ -18,6 +19,7 @@ var ( BeaconBlobSidecarsPath = "/eth/v1/beacon/blob_sidecars/%s" ) +// BeaconClient subscribes to events from a beacon node and fetches newly available blobs. type BeaconClient struct { URL string } @@ -26,7 +28,7 @@ func (bc *BeaconClient) Endpoint(path string, args ...any) string { return fmt.Sprintf(bc.URL+path, args...) } -func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) error { +func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db storage.KvStorage) error { events := make(chan *Event) subBlobs := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "blob_sidecar"), events) @@ -43,6 +45,7 @@ func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) erro // blob_sidecar events are received after validation over gossip, and the blobs they reference // will not be available until head is updated with the corresponding block. + // Reorgs should not cause an issue here as we aren't concerned with canonicity. timeout := 5 * time.Minute for { select { @@ -111,12 +114,12 @@ func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) erro } case <-ctx.Done(): log.Info("Context cancelled, exiting") - return nil + return ctx.Err() } } } -func pushBlob(ctx context.Context, blob *BlobSidecar, db KvStorage) error { +func pushBlob(ctx context.Context, blob *BlobSidecar, db storage.KvStorage) error { vhash := blob.KzgCommitment.ToVersionedHash() data, err := hex.DecodeString(blob.Blob[2:]) if err != nil { @@ -125,7 +128,7 @@ func pushBlob(ctx context.Context, blob *BlobSidecar, db KvStorage) error { } log.Info("Storing blob", "vhash", vhash) - err = db.Set(ctx, BlobKeyFromHash(vhash), data, 0) + err = db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0) if err != nil { log.Error("Error storing blob", "error", err) return err diff --git a/pkg/blobscan/models.go b/pkg/blobscan/models.go new file mode 100644 index 0000000..4c9f9e0 --- /dev/null +++ b/pkg/blobscan/models.go @@ -0,0 +1,33 @@ +package blobscan + +import "github.com/protolambda/zrnt/eth2/beacon/common" + +type VersionedHash = common.Bytes32 + +// BlobscanBlock represents an EL block from the Blobscan API +type BlobscanBlock struct { + Slot common.Slot `json:"slot"` + Hash common.Root `json:"hash"` + Number uint64 `json:"number"` + Transactions []BlobscanBlockTransaction `json:"transactions"` + Timestamp string `json:"timestamp"` +} + +// BlobscanBlockTransaction represents a transaction within a block from the Blobscan API +type BlobscanBlockTransaction struct { + Hash common.Root `json:"hash"` + Blobs []struct { + Index uint64 `json:"index"` + VersionedHash VersionedHash `json:"versionedHash"` + } `json:"blobs"` + Rollup string `json:"rollup"` +} + +// BlobscanBlob represents a blob from the Blobscan API +type BlobscanBlob struct { + Commitment common.KZGCommitment `json:"commitment"` + Proof string `json:"proof"` + Size int `json:"size"` + VersionedHash VersionedHash `json:"versionedHash"` + Data string `json:"data"` +} diff --git a/pkg/blobscan/service.go b/pkg/blobscan/service.go new file mode 100644 index 0000000..169fb65 --- /dev/null +++ b/pkg/blobscan/service.go @@ -0,0 +1,114 @@ +package blobscan + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + log "log/slog" + "net/http" + + "github.com/cerc-io/eth-blob-indexer/pkg/storage" +) + +const ( + // Max number of objects returned per API call (same for blobs & blocks) + BlobscanMaxResults = 25 +) + +var ( + BlobscanBlocksRangePath = "/blocks?sort=asc&startBlock=%d&endBlock=%d" + BlobscanBlobsPath = "/blobs/%s" +) + +type BlobscanScraper struct { + URL string + BlockStep uint64 +} + +func (bs *BlobscanScraper) Endpoint(path string, args ...any) string { + return fmt.Sprintf(bs.URL+path, args...) +} + +// ApiBlocksRange returns the API endpoint for fetching a (half-open) range of blocks +func (bs *BlobscanScraper) ApiBlocksRange(from, to uint64) string { + return bs.Endpoint(BlobscanBlocksRangePath, from, to) +} + +func (bs *BlobscanScraper) ScrapeBlobs(ctx context.Context, from, to uint64, db storage.KvStorage) error { + for num := from; to == 0 || num <= to; num += bs.BlockStep { + if ctx.Err() != nil { + return ctx.Err() + } + end := min(to, num+bs.BlockStep) + endpoint := bs.ApiBlocksRange(num, end) + log.Debug("Fetching block", "endpoint", endpoint) + blocks, err := fetchBlocks(ctx, endpoint) + if err != nil { + log.Error("Failed to fetch blocks", "block", num, "error", err) + return err + } + + for _, block := range blocks { + log.Debug("Processing block", "block", block.Number) + for _, tx := range block.Transactions { + for _, txblob := range tx.Blobs { + endpoint := bs.Endpoint(BlobscanBlobsPath, txblob.VersionedHash.String()) + log.Debug("Fetching blob", "endpoint", endpoint) + blob, err := fetchBlob(ctx, endpoint) + if err != nil { + log.Error("Failed to fetch blob", "error", err, "endpoint", endpoint) + return err + } + data, err := hex.DecodeString(blob.Data[2:]) + if err != nil { + log.Error("Error decoding blob", "error", err) + return err + } + if err := pushBlob(ctx, blob.VersionedHash, data, db); err != nil { + return err + } + } + } + } + } + return nil +} + +func httpGetJson[T any](ctx context.Context, endpoint string, ret *T) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) + if err != nil { + return err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("HTTP %d", resp.StatusCode) + } + defer resp.Body.Close() + + return json.NewDecoder(resp.Body).Decode(ret) +} + +func fetchBlocks(ctx context.Context, endpoint string) ([]BlobscanBlock, error) { + var ret struct { + Blobs []BlobscanBlock `json:"blocks"` + } + return ret.Blobs, httpGetJson(ctx, endpoint, &ret) +} + +func fetchBlob(ctx context.Context, endpoint string) (*BlobscanBlob, error) { + var ret BlobscanBlob + return &ret, httpGetJson(ctx, endpoint, &ret) +} + +func pushBlob(ctx context.Context, vhash VersionedHash, data []byte, db storage.KvStorage) error { + log.Info("Storing blob", "vhash", vhash) + err := db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0) + if err != nil { + log.Error("Error storing blob", "error", err) + } + return err +} diff --git a/pkg/storage.go b/pkg/storage/storage.go similarity index 97% rename from pkg/storage.go rename to pkg/storage/storage.go index 3506f82..0df26d4 100644 --- a/pkg/storage.go +++ b/pkg/storage/storage.go @@ -1,4 +1,4 @@ -package blob_indexer +package storage import ( "context" diff --git a/test/compose.yml b/test/compose.yml index 8047f75..391d91f 100644 --- a/test/compose.yml +++ b/test/compose.yml @@ -1,6 +1,9 @@ services: eth-blob-indexer: image: cerc/eth-blob-indexer:local + build: + context: .. + dockerfile: Dockerfile restart: on-failure depends_on: eth-blob-db: @@ -16,7 +19,7 @@ services: ports: - 6379 healthcheck: - test: [ "CMD", "redis-cli", "--raw", "incr", "ping" ] + test: [ "CMD", "redis-cli", "--raw", "incr", "_ping" ] networks: test_default: