Implement Blobscan scraper #2
@ -13,8 +13,8 @@ env:
|
||||
SYSTEM_TESTS_REF: roysc/test-blob-tx
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: Run integration tests
|
||||
test-beacon-collector:
|
||||
name: Run Beacon collector tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
@ -62,3 +62,36 @@ jobs:
|
||||
pip3 install --no-deps 'eth-account>=0.12.3'
|
||||
pip3 install 'pydantic>=2.0.0'
|
||||
python3 -m pytest -vv -k test_blob_tx
|
||||
|
||||
test-blobscan-scraper:
|
||||
name: Run Blobscan scraper tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version-file: 'go.mod'
|
||||
check-latest: true
|
||||
- name: "Install redis-cli"
|
||||
env:
|
||||
DEBIAN_FRONTEND: noninteractive
|
||||
run: apt-get update && apt-get install -y redis-tools
|
||||
|
||||
- name: "Run Blob DB"
|
||||
working-directory: ./test
|
||||
run: docker compose up eth-blob-db --wait --quiet-pull
|
||||
# Check a mainnet blob
|
||||
- name: "Run tests"
|
||||
env:
|
||||
TEST_BLOCK: 20000000
|
||||
TEST_VHASH: 0x017ba4bd9c166498865a3d08618e333ee84812941b5c3a356971b4a6ffffa574
|
||||
# First 10 bytes of blob data
|
||||
EXPECTED_VALUE: 2100000675789c8cd37b0a
|
||||
run: |
|
||||
REDIS_PORT=$(docker port test-eth-blob-db-1 6379 | cut -d: -f2)
|
||||
go run ./cmd/blobscan-scraper \
|
||||
--redis-addr localhost:$REDIS_PORT --log-level debug \
|
||||
--from-block $TEST_BLOCK --to-block $((TEST_BLOCK + 1))
|
||||
|
||||
VALUE=$(redis-cli -p $REDIS_PORT GETRANGE blob:$TEST_VHASH 0 9 | od -An -tx1 | tr -d ' \n')
|
||||
[ "$EXPECTED_VALUE" = "$VALUE" ]
|
||||
|
@ -5,11 +5,11 @@ WORKDIR /eth-blob-indexer
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
COPY . .
|
||||
RUN go build -o eth-blob-indexer .
|
||||
RUN go build -o beacon-blob-collector ./cmd/beacon-blob-collector
|
||||
|
||||
FROM alpine
|
||||
|
||||
WORKDIR /app
|
||||
COPY --from=builder /eth-blob-indexer/eth-blob-indexer .
|
||||
COPY --from=builder /eth-blob-indexer/beacon-blob-collector .
|
||||
|
||||
ENTRYPOINT ["/app/eth-blob-indexer"]
|
||||
ENTRYPOINT ["/app/beacon-blob-collector"]
|
||||
|
41
README.md
41
README.md
@ -1,10 +1,19 @@
|
||||
# eth-blob-indexer
|
||||
|
||||
A service that indexes all newly committed blobs on the Ethereum Beacon chain.
|
||||
Tools to index Ethereum blob data.
|
||||
|
||||
Example:
|
||||
## beacon-blob-collector
|
||||
|
||||
Service that indexes all newly committed blobs on the Ethereum Beacon chain.
|
||||
|
||||
Build:
|
||||
```
|
||||
$ eth-blob-indexer \
|
||||
$ go build ./cmd/beacon-blob-collector
|
||||
```
|
||||
|
||||
Run:
|
||||
```
|
||||
$ beacon-blob-collector \
|
||||
--beacon-addr=http://fixturenet-eth-lighthouse-1:8001 \
|
||||
--redis-addr=eth-blob-db:6379 \
|
||||
--log-level=debug
|
||||
@ -19,3 +28,29 @@ May 29 17:16:16.098 DBG Received head event slot=240 block=0x304d37b405fb99913a9
|
||||
May 29 17:16:16.098 DBG Fetching blobs endpoint=http://fixturenet-eth-lighthouse-1:8001/eth/v1/beacon/blob_sidecars/240
|
||||
May 29 17:16:16.107 INF Storing blob vhash=0x01265315cda2c869191467c60f7374dc80be8be32b8117cd780655014db819f6
|
||||
```
|
||||
|
||||
## blobscan-scraper
|
||||
|
||||
Service that scrapes the Blobscan API for ranges of past blobs. Useful to backfill data no longer
|
||||
available from a Beacon node (assuming Blobscan has retained the data).
|
||||
|
||||
Build:
|
||||
```
|
||||
$ go build ./cmd/blobscan-scraper
|
||||
```
|
||||
|
||||
Run:
|
||||
```
|
||||
$ blobscan-scraper --redis-addr localhost:57889 --log-level debug --block-start 20032451
|
||||
Jun 7 18:11:01.619 INF Starting indexer blobscan=https://api.blobscan.com redis=localhost:57889
|
||||
Jun 7 18:11:01.619 DBG Fetching block endpoint="https://api.blobscan.com/blocks?sort=asc&startBlock=20032451&endBlock=20032476"
|
||||
Jun 7 18:11:03.115 DBG Processing block block=20032451
|
||||
Jun 7 18:11:03.115 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x0126e8b373caaba7556c5f410efc445c64235ef048354467a3aaf97588ca8490
|
||||
Jun 7 18:11:04.665 INF Storing blob vhash=0x0126e8b373caaba7556c5f410efc445c64235ef048354467a3aaf97588ca8490
|
||||
Jun 7 18:11:04.721 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x01b57cd55c617e40af61a916cbafbd596646c70f69d4685383dfd67966aeadb3
|
||||
Jun 7 18:11:05.726 INF Storing blob vhash=0x01b57cd55c617e40af61a916cbafbd596646c70f69d4685383dfd67966aeadb3
|
||||
Jun 7 18:11:05.750 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x01c7ffd0638a0d4b1774668813b7d8740de5ccfaaaac83c333f63b8a7ffa695e
|
||||
Jun 7 18:11:06.663 INF Storing blob vhash=0x01c7ffd0638a0d4b1774668813b7d8740de5ccfaaaac83c333f63b8a7ffa695e
|
||||
Jun 7 18:11:06.671 DBG Processing block block=20032453
|
||||
Jun 7 18:11:06.671 DBG Fetching blob endpoint=https://api.blobscan.com/blobs/0x01badc50474c8e8353e4becf87147f14288198354948ef13a4b461f28624e87a
|
||||
```
|
||||
|
35
cmd/beacon-blob-collector/main.go
Normal file
35
cmd/beacon-blob-collector/main.go
Normal file
@ -0,0 +1,35 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/cerc-io/eth-blob-indexer/cmd"
|
||||
"github.com/cerc-io/eth-blob-indexer/pkg/beacon"
|
||||
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultBeaconAddr = "https://localhost:5052"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var BeaconAddress string
|
||||
cmd.Flags.StringVar(&BeaconAddress, "beacon-addr", defaultBeaconAddr, "Address of the beacon node")
|
||||
cmd.ParseFlags()
|
||||
|
||||
indexer := &beacon.BeaconClient{URL: BeaconAddress}
|
||||
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: cmd.RedisAddress,
|
||||
})
|
||||
db := storage.NewRedisStorage(rdb)
|
||||
|
||||
slog.Info("Starting indexer", "beacon", BeaconAddress, "redis", cmd.RedisAddress)
|
||||
err := indexer.CollectBlobs(context.Background(), db)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
49
cmd/blobscan-scraper/main.go
Normal file
49
cmd/blobscan-scraper/main.go
Normal file
@ -0,0 +1,49 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/cerc-io/eth-blob-indexer/cmd"
|
||||
"github.com/cerc-io/eth-blob-indexer/pkg/blobscan"
|
||||
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultBlobscanAddr = "https://api.blobscan.com"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var BlobscanAddress string
|
||||
var FromBlock, ToBlock uint64
|
||||
var BlockStep uint64
|
||||
|
||||
cmd.Flags.StringVar(&BlobscanAddress,
|
||||
"blobscan-addr", defaultBlobscanAddr, "Address of the Blobscan API")
|
||||
cmd.Flags.Uint64Var(&FromBlock,
|
||||
"from-block", 0, "EL block number to start scanning from")
|
||||
cmd.Flags.Uint64Var(&ToBlock,
|
||||
"to-block", 0, "EL block number to scan up to (0 for no limit))")
|
||||
cmd.Flags.Uint64Var(&BlockStep,
|
||||
"block-step", blobscan.BlobscanMaxResults, "Number of blocks to fetch per API call")
|
||||
cmd.ParseFlags()
|
||||
|
||||
if BlockStep > blobscan.BlobscanMaxResults {
|
||||
slog.Warn("Block step exceeds max results supported by API",
|
||||
"block-step", BlockStep, "max", blobscan.BlobscanMaxResults)
|
||||
}
|
||||
|
||||
indexer := &blobscan.BlobscanScraper{URL: BlobscanAddress, BlockStep: BlockStep}
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: cmd.RedisAddress,
|
||||
})
|
||||
db := storage.NewRedisStorage(rdb)
|
||||
|
||||
slog.Info("Starting indexer", "blobscan", BlobscanAddress, "redis", cmd.RedisAddress)
|
||||
err := indexer.ScrapeBlobs(context.Background(), FromBlock, ToBlock, db)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
50
cmd/flags.go
Normal file
50
cmd/flags.go
Normal file
@ -0,0 +1,50 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"log/slog"
|
||||
"os"
|
||||
|
||||
"github.com/lmittmann/tint"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultRedisAddr = "localhost:6379"
|
||||
defaultRedisPassword = ""
|
||||
)
|
||||
|
||||
var (
|
||||
// Flags shared among CLI entrypoints
|
||||
Flags *flag.FlagSet
|
||||
|
||||
RedisAddress string
|
||||
RedisPassword string
|
||||
LogLevel flagLevel
|
||||
|
||||
logLevelVar slog.LevelVar
|
||||
)
|
||||
|
||||
func init() {
|
||||
h := tint.NewHandler(os.Stderr, &tint.Options{Level: &logLevelVar})
|
||||
slog.SetDefault(slog.New(h))
|
||||
|
||||
Flags = flag.NewFlagSet("blob-indexer", flag.ExitOnError)
|
||||
Flags.StringVar(&RedisAddress, "redis-addr", defaultRedisAddr, "Address of the Redis server")
|
||||
Flags.StringVar(&RedisPassword, "redis-password", defaultRedisPassword, "Password for the Redis server")
|
||||
Flags.Var(&LogLevel, "log-level", "Log level (default INFO)")
|
||||
}
|
||||
|
||||
type flagLevel struct{ slog.Level }
|
||||
|
||||
func (fl *flagLevel) Set(value string) error {
|
||||
return fl.UnmarshalText([]byte(value))
|
||||
}
|
||||
|
||||
func ParseFlags() {
|
||||
err := Flags.Parse(os.Args[1:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
logLevelVar.Set(LogLevel.Level)
|
||||
}
|
69
main.go
69
main.go
@ -1,69 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"log/slog"
|
||||
"os"
|
||||
|
||||
indexer "github.com/cerc-io/eth-blob-indexer/pkg"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/lmittmann/tint"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultBeaconAddr = "https://localhost:5052"
|
||||
defaultRedisAddr = "localhost:6379"
|
||||
defaultRedisPassword = ""
|
||||
)
|
||||
|
||||
var (
|
||||
logLevelVar slog.LevelVar
|
||||
)
|
||||
|
||||
func init() {
|
||||
h := tint.NewHandler(os.Stderr, &tint.Options{Level: &logLevelVar})
|
||||
slog.SetDefault(slog.New(h))
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Parse command line arguments
|
||||
var beaconAddress, redisAddress string
|
||||
var redisPassword string
|
||||
var logLevel flagLevel
|
||||
|
||||
var flags = flag.NewFlagSet("blob-indexer", flag.ExitOnError)
|
||||
flags.StringVar(&beaconAddress, "beacon-addr", defaultBeaconAddr, "Address of the beacon node")
|
||||
flags.StringVar(&redisAddress, "redis-addr", defaultRedisAddr, "Address of the Redis server")
|
||||
flags.StringVar(&redisPassword, "redis-password", defaultRedisPassword, "Password for the Redis server")
|
||||
flags.Var(&logLevel, "log-level", "Log level (default INFO)")
|
||||
|
||||
err := flags.Parse(os.Args[1:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
logLevelVar.Set(logLevel.Level)
|
||||
|
||||
beacon := &indexer.BeaconClient{URL: beaconAddress}
|
||||
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: redisAddress,
|
||||
})
|
||||
db := indexer.NewRedisStorage(rdb)
|
||||
|
||||
// db := indexer.NewMapStorage()
|
||||
|
||||
slog.Info("Starting indexer", "beacon", beaconAddress, "redis", redisAddress)
|
||||
err = beacon.CollectBlobs(context.Background(), db)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
type flagLevel struct{ slog.Level }
|
||||
|
||||
func (fl *flagLevel) Set(value string) error {
|
||||
return fl.UnmarshalText([]byte(value))
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package blob_indexer
|
||||
package beacon
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,9 +1,10 @@
|
||||
package blob_indexer
|
||||
package beacon
|
||||
|
||||
import "github.com/protolambda/zrnt/eth2/beacon/common"
|
||||
|
||||
type VersionedHash = common.Bytes32
|
||||
|
||||
// HeadEvent represents a "head" event from the Blobscan API
|
||||
type HeadEvent struct {
|
||||
Slot common.Slot `json:"slot"`
|
||||
Block common.Root `json:"block"`
|
||||
@ -14,6 +15,7 @@ type HeadEvent struct {
|
||||
ExecutionOptimistic bool `json:"execution_optimistic"`
|
||||
}
|
||||
|
||||
// BlobSidecarEvent represents a "blob_sidecar" event from the Blobscan API
|
||||
type BlobSidecarEvent struct {
|
||||
BlockRoot common.Root `json:"block_root"`
|
||||
Index string `json:"index"`
|
||||
@ -22,6 +24,7 @@ type BlobSidecarEvent struct {
|
||||
VersionedHash VersionedHash `json:"versioned_hash"`
|
||||
}
|
||||
|
||||
// BlobSidecarsResponse represents a response from the Blobscan API blob_sidecars endpoint
|
||||
type BlobSidecarsResponse struct {
|
||||
Data []BlobSidecar `json:"data"`
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package blob_indexer
|
||||
package beacon
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
|
||||
"github.com/protolambda/zrnt/eth2/beacon/common"
|
||||
)
|
||||
|
||||
@ -18,6 +19,7 @@ var (
|
||||
BeaconBlobSidecarsPath = "/eth/v1/beacon/blob_sidecars/%s"
|
||||
)
|
||||
|
||||
// BeaconClient subscribes to events from a beacon node and fetches newly available blobs.
|
||||
type BeaconClient struct {
|
||||
URL string
|
||||
}
|
||||
@ -26,7 +28,7 @@ func (bc *BeaconClient) Endpoint(path string, args ...any) string {
|
||||
return fmt.Sprintf(bc.URL+path, args...)
|
||||
}
|
||||
|
||||
func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) error {
|
||||
func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db storage.KvStorage) error {
|
||||
events := make(chan *Event)
|
||||
|
||||
subBlobs := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "blob_sidecar"), events)
|
||||
@ -43,6 +45,7 @@ func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) erro
|
||||
|
||||
// blob_sidecar events are received after validation over gossip, and the blobs they reference
|
||||
// will not be available until head is updated with the corresponding block.
|
||||
// Reorgs should not cause an issue here as we aren't concerned with canonicity.
|
||||
timeout := 5 * time.Minute
|
||||
for {
|
||||
select {
|
||||
@ -111,12 +114,12 @@ func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) erro
|
||||
}
|
||||
case <-ctx.Done():
|
||||
log.Info("Context cancelled, exiting")
|
||||
return nil
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func pushBlob(ctx context.Context, blob *BlobSidecar, db KvStorage) error {
|
||||
func pushBlob(ctx context.Context, blob *BlobSidecar, db storage.KvStorage) error {
|
||||
vhash := blob.KzgCommitment.ToVersionedHash()
|
||||
data, err := hex.DecodeString(blob.Blob[2:])
|
||||
if err != nil {
|
||||
@ -125,7 +128,7 @@ func pushBlob(ctx context.Context, blob *BlobSidecar, db KvStorage) error {
|
||||
}
|
||||
|
||||
log.Info("Storing blob", "vhash", vhash)
|
||||
err = db.Set(ctx, BlobKeyFromHash(vhash), data, 0)
|
||||
err = db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0)
|
||||
if err != nil {
|
||||
log.Error("Error storing blob", "error", err)
|
||||
return err
|
33
pkg/blobscan/models.go
Normal file
33
pkg/blobscan/models.go
Normal file
@ -0,0 +1,33 @@
|
||||
package blobscan
|
||||
|
||||
import "github.com/protolambda/zrnt/eth2/beacon/common"
|
||||
|
||||
type VersionedHash = common.Bytes32
|
||||
|
||||
// BlobscanBlock represents an EL block from the Blobscan API
|
||||
type BlobscanBlock struct {
|
||||
Slot common.Slot `json:"slot"`
|
||||
Hash common.Root `json:"hash"`
|
||||
Number uint64 `json:"number"`
|
||||
Transactions []BlobscanBlockTransaction `json:"transactions"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
}
|
||||
|
||||
// BlobscanBlockTransaction represents a transaction within a block from the Blobscan API
|
||||
type BlobscanBlockTransaction struct {
|
||||
Hash common.Root `json:"hash"`
|
||||
Blobs []struct {
|
||||
Index uint64 `json:"index"`
|
||||
VersionedHash VersionedHash `json:"versionedHash"`
|
||||
} `json:"blobs"`
|
||||
Rollup string `json:"rollup"`
|
||||
}
|
||||
|
||||
// BlobscanBlob represents a blob from the Blobscan API
|
||||
type BlobscanBlob struct {
|
||||
Commitment common.KZGCommitment `json:"commitment"`
|
||||
Proof string `json:"proof"`
|
||||
Size int `json:"size"`
|
||||
VersionedHash VersionedHash `json:"versionedHash"`
|
||||
Data string `json:"data"`
|
||||
}
|
114
pkg/blobscan/service.go
Normal file
114
pkg/blobscan/service.go
Normal file
@ -0,0 +1,114 @@
|
||||
package blobscan
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
log "log/slog"
|
||||
"net/http"
|
||||
|
||||
"github.com/cerc-io/eth-blob-indexer/pkg/storage"
|
||||
)
|
||||
|
||||
const (
|
||||
// Max number of objects returned per API call (same for blobs & blocks)
|
||||
BlobscanMaxResults = 25
|
||||
)
|
||||
|
||||
var (
|
||||
BlobscanBlocksRangePath = "/blocks?sort=asc&startBlock=%d&endBlock=%d"
|
||||
BlobscanBlobsPath = "/blobs/%s"
|
||||
)
|
||||
|
||||
type BlobscanScraper struct {
|
||||
URL string
|
||||
BlockStep uint64
|
||||
}
|
||||
|
||||
func (bs *BlobscanScraper) Endpoint(path string, args ...any) string {
|
||||
return fmt.Sprintf(bs.URL+path, args...)
|
||||
}
|
||||
|
||||
// ApiBlocksRange returns the API endpoint for fetching a (half-open) range of blocks
|
||||
func (bs *BlobscanScraper) ApiBlocksRange(from, to uint64) string {
|
||||
return bs.Endpoint(BlobscanBlocksRangePath, from, to)
|
||||
}
|
||||
|
||||
func (bs *BlobscanScraper) ScrapeBlobs(ctx context.Context, from, to uint64, db storage.KvStorage) error {
|
||||
for num := from; to == 0 || num <= to; num += bs.BlockStep {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
end := min(to, num+bs.BlockStep)
|
||||
endpoint := bs.ApiBlocksRange(num, end)
|
||||
log.Debug("Fetching block", "endpoint", endpoint)
|
||||
blocks, err := fetchBlocks(ctx, endpoint)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch blocks", "block", num, "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, block := range blocks {
|
||||
log.Debug("Processing block", "block", block.Number)
|
||||
for _, tx := range block.Transactions {
|
||||
for _, txblob := range tx.Blobs {
|
||||
endpoint := bs.Endpoint(BlobscanBlobsPath, txblob.VersionedHash.String())
|
||||
log.Debug("Fetching blob", "endpoint", endpoint)
|
||||
blob, err := fetchBlob(ctx, endpoint)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch blob", "error", err, "endpoint", endpoint)
|
||||
return err
|
||||
}
|
||||
data, err := hex.DecodeString(blob.Data[2:])
|
||||
if err != nil {
|
||||
log.Error("Error decoding blob", "error", err)
|
||||
return err
|
||||
}
|
||||
if err := pushBlob(ctx, blob.VersionedHash, data, db); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func httpGetJson[T any](ctx context.Context, endpoint string, ret *T) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("HTTP %d", resp.StatusCode)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return json.NewDecoder(resp.Body).Decode(ret)
|
||||
}
|
||||
|
||||
func fetchBlocks(ctx context.Context, endpoint string) ([]BlobscanBlock, error) {
|
||||
var ret struct {
|
||||
Blobs []BlobscanBlock `json:"blocks"`
|
||||
}
|
||||
return ret.Blobs, httpGetJson(ctx, endpoint, &ret)
|
||||
}
|
||||
|
||||
func fetchBlob(ctx context.Context, endpoint string) (*BlobscanBlob, error) {
|
||||
var ret BlobscanBlob
|
||||
return &ret, httpGetJson(ctx, endpoint, &ret)
|
||||
}
|
||||
|
||||
func pushBlob(ctx context.Context, vhash VersionedHash, data []byte, db storage.KvStorage) error {
|
||||
log.Info("Storing blob", "vhash", vhash)
|
||||
err := db.Set(ctx, storage.BlobKeyFromHash(vhash), data, 0)
|
||||
if err != nil {
|
||||
log.Error("Error storing blob", "error", err)
|
||||
}
|
||||
return err
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package blob_indexer
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,6 +1,9 @@
|
||||
services:
|
||||
eth-blob-indexer:
|
||||
image: cerc/eth-blob-indexer:local
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: Dockerfile
|
||||
restart: on-failure
|
||||
depends_on:
|
||||
eth-blob-db:
|
||||
@ -16,7 +19,7 @@ services:
|
||||
ports:
|
||||
- 6379
|
||||
healthcheck:
|
||||
test: [ "CMD", "redis-cli", "--raw", "incr", "ping" ]
|
||||
test: [ "CMD", "redis-cli", "--raw", "incr", "_ping" ]
|
||||
|
||||
networks:
|
||||
test_default:
|
||||
|
Loading…
Reference in New Issue
Block a user