commit 3d8d84df86d85ad76cb7f5fa07eb09cea65e5c44 Author: Roy Crihfield Date: Sat May 18 01:22:23 2024 +0800 Initial service Dockerfile & compose file CI workflow diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..70d2a02 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +Dockerfile +.git \ No newline at end of file diff --git a/.gitea/workflows/test.yml b/.gitea/workflows/test.yml new file mode 100644 index 0000000..b66a479 --- /dev/null +++ b/.gitea/workflows/test.yml @@ -0,0 +1,65 @@ +name: Tests + +on: + pull_request: + branches: '*' + push: + branches: + - main + - ci-test + +env: + SO_VERSION: roysc/fix-eth-stacks + SYSTEM_TESTS_REF: roysc/test-blob-tx + +jobs: + test: + name: Run integration tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 + with: + go-version-file: 'go.mod' + check-latest: true + - name: "Build server image" + run: docker build . -t cerc/eth-blob-indexer:local + - name: "Install Python" + uses: actions/setup-python@v4 + with: + python-version: 3.11 + + - name: "Install stack-orchestrator" + uses: actions/checkout@v3 + with: + repository: cerc-io/stack-orchestrator + ref: ${{ env.SO_VERSION }} + path: ./stack-orchestrator + - run: pip install ./stack-orchestrator + + - name: "Clone system-tests" + uses: actions/checkout@v3 + with: + repository: cerc-io/system-tests + ref: ${{ env.SYSTEM_TESTS_REF }} + path: ./system-tests + - name: "Install pytest" + working-directory: ./system-tests + run: pip3 install pytest + + - name: "Run fixturenet stack" + run: ./scripts/integration-setup.sh + env: + SKIP_BUILD: 1 + - name: "Run server" + run: docker compose -f test/compose.yml up --wait --quiet-pull + - name: "Run tests" + working-directory: ./system-tests + # Work around dependency conflicts: + # - web3 uses an older eth-account until (unreleased) v7 + # - old pydantic is pinned by stack-orchestrator + run: | + pip3 install -r requirements.txt + pip3 install --no-deps 'eth-account>=0.12.3' + pip3 install 'pydantic>=2.0.0' + python3 -m pytest -vv -k test_blob_tx diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d12231a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM golang:1.22-alpine as builder + +WORKDIR /eth-blob-indexer + +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +RUN go build -o eth-blob-indexer . + +FROM alpine + +WORKDIR /app +COPY --from=builder /eth-blob-indexer/eth-blob-indexer . + +ENTRYPOINT ["/app/eth-blob-indexer"] diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..00696a5 --- /dev/null +++ b/doc.go @@ -0,0 +1,6 @@ +package main + +// Usage: indexer \ +// --beacon-addr=https://localhost:5052 \ +// --redis-addr=localhost:6379 --redis-password=secret --redis-db=0 \ +// --log-level=debug diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..722bf45 --- /dev/null +++ b/go.mod @@ -0,0 +1,24 @@ +module github.com/cerc-io/eth-blob-indexer + +go 1.22 + +require ( + github.com/go-redis/redis/v8 v8.11.5 + github.com/lmittmann/tint v1.0.4 + github.com/protolambda/zrnt v0.32.3 + github.com/r3labs/sse/v2 v2.10.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/holiman/uint256 v1.2.0 // indirect + github.com/kilic/bls12-381 v0.1.0 // indirect + github.com/minio/sha256-simd v0.1.0 // indirect + github.com/protolambda/bls12-381-util v0.1.0 // indirect + github.com/protolambda/ztyp v0.2.2 // indirect + golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect + golang.org/x/sys v0.17.0 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect + gopkg.in/yaml.v3 v3.0.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ab251da --- /dev/null +++ b/go.sum @@ -0,0 +1,63 @@ +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM= +github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= +github.com/kilic/bls12-381 v0.1.0 h1:encrdjqKMEvabVQ7qYOKu1OvhqpK4s47wDYtNiPtlp4= +github.com/kilic/bls12-381 v0.1.0/go.mod h1:vDTTHJONJ6G+P2R74EhnyotQDTliQDnFEwhdmfzw1ig= +github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= +github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= +github.com/minio/sha256-simd v0.1.0 h1:U41/2erhAKcmSI14xh/ZTUdBPOzDOIfS93ibzUSl8KM= +github.com/minio/sha256-simd v0.1.0/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/protolambda/bls12-381-util v0.1.0 h1:05DU2wJN7DTU7z28+Q+zejXkIsA/MF8JZQGhtBZZiWk= +github.com/protolambda/bls12-381-util v0.1.0/go.mod h1:cdkysJTRpeFeuUVx/TXGDQNMTiRAalk1vQw3TYTHcE4= +github.com/protolambda/zrnt v0.32.3 h1:b3mkBEjcmxtft115cBIQk+2qz1HEb2ExDdduVQqN4v0= +github.com/protolambda/zrnt v0.32.3/go.mod h1:A0fezkp9Tt3GBLATSPIbuY4ywYESyAuc/FFmPKg8Lqs= +github.com/protolambda/ztyp v0.2.2 h1:rVcL3vBu9W/aV646zF6caLS/dyn9BN8NYiuJzicLNyY= +github.com/protolambda/ztyp v0.2.2/go.mod h1:9bYgKGqg3wJqT9ac1gI2hnVb0STQq7p/1lapqrqY1dU= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201101102859-da207088b7d1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..49d3059 --- /dev/null +++ b/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "flag" + "log/slog" + "os" + + indexer "github.com/cerc-io/eth-blob-indexer/pkg" + + "github.com/go-redis/redis/v8" + "github.com/lmittmann/tint" +) + +const ( + defaultBeaconAddr = "https://localhost:5052" + defaultRedisAddr = "localhost:6379" + defaultRedisPassword = "" +) + +var ( + logLevelVar slog.LevelVar +) + +func init() { + h := tint.NewHandler(os.Stderr, &tint.Options{Level: &logLevelVar}) + slog.SetDefault(slog.New(h)) +} + +func main() { + // Parse command line arguments + var beaconAddress, redisAddress string + var redisPassword string + var logLevel flagLevel + + var flags = flag.NewFlagSet("blob-indexer", flag.ExitOnError) + flags.StringVar(&beaconAddress, "beacon-addr", defaultBeaconAddr, "Address of the beacon node") + flags.StringVar(&redisAddress, "redis-addr", defaultRedisAddr, "Address of the Redis server") + flags.StringVar(&redisPassword, "redis-password", defaultRedisPassword, "Password for the Redis server") + flags.Var(&logLevel, "log-level", "Log level (default INFO)") + + err := flags.Parse(os.Args[1:]) + if err != nil { + panic(err) + } + + logLevelVar.Set(logLevel.Level) + + beacon := &indexer.BeaconClient{URL: beaconAddress} + + rdb := redis.NewClient(&redis.Options{ + Addr: redisAddress, + }) + db := indexer.NewRedisStorage(rdb) + + // db := indexer.NewMapStorage() + + slog.Info("Starting indexer", "beacon", beaconAddress, "redis", redisAddress) + err = beacon.CollectBlobs(context.Background(), db) + if err != nil { + panic(err) + } +} + +type flagLevel struct{ slog.Level } + +func (fl *flagLevel) Set(value string) error { + return fl.UnmarshalText([]byte(value)) +} diff --git a/pkg/events.go b/pkg/events.go new file mode 100644 index 0000000..9f3ad39 --- /dev/null +++ b/pkg/events.go @@ -0,0 +1,57 @@ +package blob_indexer + +import ( + "context" + log "log/slog" + "time" + + "github.com/r3labs/sse/v2" +) + +type Event = sse.Event + +type EventSubscription struct { + client *sse.Client + rawChan chan *sse.Event + errChan chan error +} + +func (se *EventSubscription) Close() { + if nil == se.client { + return + } + + log.Debug("Disconnecting and destroying SSE client", "endpoint", se.client.URL) + se.client.Unsubscribe(se.rawChan) + se.client.Connection.CloseIdleConnections() + se.client = nil +} + +func (se *EventSubscription) Err() <-chan error { + return se.errChan +} + +func CreateSubscription(ctx context.Context, endpoint string, sink chan *Event) *EventSubscription { + client := sse.NewClient(endpoint) + errChan := make(chan error) + listen := func() { + errChan <- client.SubscribeChanRawWithContext(ctx, sink) + } + + client.ReconnectNotify = func(err error, duration time.Duration) { + log.Debug("Reconnecting SSE client", "endpoint", endpoint, + "error", err, "duration", duration) + } + client.OnDisconnect(func(c *sse.Client) { + log.Debug("SSE client disconnected", "endpoint", endpoint) + }) + + log.Debug("Subscribing to event stream", "endpoint", endpoint) + go listen() + + return &EventSubscription{ + client: client, + rawChan: sink, + errChan: errChan, + } +} diff --git a/pkg/models.go b/pkg/models.go new file mode 100644 index 0000000..563d2b4 --- /dev/null +++ b/pkg/models.go @@ -0,0 +1,36 @@ +package blob_indexer + +import "github.com/protolambda/zrnt/eth2/beacon/common" + +type VersionedHash = common.Bytes32 + +type HeadEvent struct { + Slot common.Slot `json:"slot"` + Block common.Root `json:"block"` + State common.Root `json:"state"` + CurrentDutyDependentRoot common.Root `json:"current_duty_dependent_root"` + PreviousDutyDependentRoot common.Root `json:"previous_duty_dependent_root"` + EpochTransition bool `json:"epoch_transition"` + ExecutionOptimistic bool `json:"execution_optimistic"` +} + +type BlobSidecarEvent struct { + BlockRoot common.Root `json:"block_root"` + Index string `json:"index"` + Slot common.Slot `json:"slot"` + KzgCommitment common.KZGCommitment `json:"kzg_commitment"` + VersionedHash VersionedHash `json:"versioned_hash"` +} + +type BlobSidecarsResponse struct { + Data []BlobSidecar `json:"data"` +} + +type BlobSidecar struct { + Index string `json:"index"` + Blob string `json:"blob"` + KzgCommitment common.KZGCommitment `json:"kzg_commitment"` + KzgProof string `json:"kzg_proof"` + SignedBlockHeader common.SignedBeaconBlockHeader `json:"signed_block_header"` + KzgCommitmentInclusionProof []string `json:"kzg_commitment_inclusion_proof"` +} diff --git a/pkg/service.go b/pkg/service.go new file mode 100644 index 0000000..0f6de38 --- /dev/null +++ b/pkg/service.go @@ -0,0 +1,148 @@ +package blob_indexer + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + log "log/slog" + "net/http" + "time" + + "github.com/protolambda/zrnt/eth2/beacon/common" +) + +var ( + // BeaconNodeHealthPath = "/eth/v1/node/health" + BeaconEventsPath = "/eth/v1/events?topics=%s" + BeaconBlobSidecarsPath = "/eth/v1/beacon/blob_sidecars/%s" +) + +type BeaconClient struct { + URL string +} + +func (bc *BeaconClient) Endpoint(path string, args ...any) string { + return fmt.Sprintf(bc.URL+path, args...) +} + +func (beacon *BeaconClient) CollectBlobs(ctx context.Context, db KvStorage) error { + events := make(chan *Event) + + subBlobs := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "blob_sidecar"), events) + defer subBlobs.Close() + subHead := CreateSubscription(ctx, beacon.Endpoint(BeaconEventsPath, "head"), events) + defer subHead.Close() + + // Last seen head slot + // TODO: initialize with current head + var lastHead common.Slot + // Blobs we know about but haven't fetched yet + // TOOD: persist and load? + unfetched := map[common.Slot]int{} + + // blob_sidecar events are received after validation over gossip, and the blobs they reference + // will not be available until head is updated with the corresponding block. + timeout := 5 * time.Minute + for { + select { + case event := <-events: + switch string(event.Event) { + case "head": + var headEvent HeadEvent + if err := json.Unmarshal(event.Data, &headEvent); err != nil { + log.Error("Error unmarshalling event data", "error", err) + return err + } + log.Debug("Received head event", "slot", headEvent.Slot, "block", headEvent.Block) + if headEvent.Slot <= lastHead { + log.Error("Unexpected head slot", "slot", headEvent.Slot) + } + lastHead = headEvent.Slot + case "blob_sidecar": + var blobEvent BlobSidecarEvent + if err := json.Unmarshal(event.Data, &blobEvent); err != nil { + log.Error("Error unmarshalling event data", "error", err) + return err + } + log.Debug("Received blob_sidecar event", + "slot", blobEvent.Slot, + "block", blobEvent.BlockRoot, + "vhash", blobEvent.VersionedHash, + ) + unfetched[blobEvent.Slot]++ + } + + var fetched []common.Slot + for slot := range unfetched { + if slot <= lastHead { + resp, err := fetchBlobs(beacon, slot.String()) + if err != nil { + log.Error("Error fetching blobs", "error", err) + return err + } + if len(resp.Data) == 0 { + log.Error("No blobs in block", "slot", slot) + } + for _, blob := range resp.Data { + if err := pushBlob(ctx, &blob, db); err != nil { + return err + } + unfetched[slot]-- + } + } + if unfetched[slot] == 0 { + fetched = append(fetched, slot) + } + } + for _, slot := range fetched { + delete(unfetched, slot) + } + case <-time.After(timeout): + log.Debug("No events received", "for", timeout) + + case err := <-subBlobs.Err(): + if err != nil { + return err + } + case err := <-subHead.Err(): + if err != nil { + return err + } + case <-ctx.Done(): + log.Info("Context cancelled, exiting") + return nil + } + } +} + +func pushBlob(ctx context.Context, blob *BlobSidecar, db KvStorage) error { + vhash := blob.KzgCommitment.ToVersionedHash() + data, err := hex.DecodeString(blob.Blob[2:]) + if err != nil { + log.Error("Error decoding blob", "error", err) + return err + } + + log.Info("Storing blob", "vhash", vhash) + err = db.Set(ctx, BlobKeyFromHash(vhash), data, 0) + if err != nil { + log.Error("Error storing blob", "error", err) + return err + } + return nil +} + +func fetchBlobs(beacon *BeaconClient, blockId string) (*BlobSidecarsResponse, error) { + endpoint := beacon.Endpoint(BeaconBlobSidecarsPath, blockId) + log.Debug("Fetching blobs", "endpoint", endpoint) + resp, err := http.Get(endpoint) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var obj BlobSidecarsResponse + decoder := json.NewDecoder(resp.Body) + return &obj, decoder.Decode(&obj) +} diff --git a/pkg/storage.go b/pkg/storage.go new file mode 100644 index 0000000..3506f82 --- /dev/null +++ b/pkg/storage.go @@ -0,0 +1,43 @@ +package blob_indexer + +import ( + "context" + "time" + + "github.com/go-redis/redis/v8" + "github.com/protolambda/zrnt/eth2/beacon/common" +) + +func BlobKeyFromHash(vhash common.Hash32) string { + return "blob:" + vhash.String() +} + +// KvStorage represents a Redis-like KV store +type KvStorage interface { + Set(ctx context.Context, key string, value []byte, expiration time.Duration) error +} + +type RedisStorage struct { + rdb *redis.Client +} + +func NewRedisStorage(rdb *redis.Client) *RedisStorage { + return &RedisStorage{rdb: rdb} +} + +func (rs *RedisStorage) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error { + return rs.rdb.Set(ctx, key, value, expiration).Err() +} + +type MapStorage struct { + store map[string][]byte +} + +func NewMapStorage() *MapStorage { + return &MapStorage{store: make(map[string][]byte)} +} + +func (ms *MapStorage) Set(ctx context.Context, key string, value []byte, expiration time.Duration) error { + ms.store[key] = value + return nil +} diff --git a/scripts/integration-setup.sh b/scripts/integration-setup.sh new file mode 100755 index 0000000..af3f531 --- /dev/null +++ b/scripts/integration-setup.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# Builds and deploys a stack with only what we need. +# This script assumes it is running in the project root. + +set -e + +laconic_so="${LACONIC_SO:-laconic-so} --stack $(readlink -f test) --verbose" +config_dir=$(readlink -f "${CONFIG_DIR:-$(mktemp -d)}") + +# Prevent conflicting tty output +export BUILDKIT_PROGRESS=plain + +# By default assume we are running in the project root +export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-..}" +# The debugger can swallow error messages on panic +echo CERC_REMOTE_DEBUG=false >> $config_dir/stack.env + +set -x + +if [[ -z $SKIP_BUILD ]]; then + $laconic_so setup-repositories + $laconic_so build-containers +else + $laconic_so fetch-containers --image-registry gitea.local:5555 +fi + +$laconic_so deploy \ + --env-file $config_dir/stack.env \ + --cluster test up diff --git a/test/compose.yml b/test/compose.yml new file mode 100644 index 0000000..8047f75 --- /dev/null +++ b/test/compose.yml @@ -0,0 +1,23 @@ +services: + eth-blob-indexer: + image: cerc/eth-blob-indexer:local + restart: on-failure + depends_on: + eth-blob-db: + condition: service_healthy + command: + - "--beacon-addr=${ETH_BEACON_ADDR:-http://fixturenet-eth-lighthouse-1:8001}" + - "--redis-addr=${ETH_BLOBDB_ADDR:-eth-blob-db:6379}" + - "--log-level=debug" + + eth-blob-db: + image: redis:7 + restart: always + ports: + - 6379 + healthcheck: + test: [ "CMD", "redis-cli", "--raw", "incr", "ping" ] + +networks: + test_default: + external: true diff --git a/test/stack.yml b/test/stack.yml new file mode 100644 index 0000000..29ba702 --- /dev/null +++ b/test/stack.yml @@ -0,0 +1,15 @@ +# version: 1.2 +name: fixturenet-eth-blobs +description: "Ethereum Fixturenet with blob indexing" +repos: + - git.vdb.to/cerc-io/go-ethereum@v1.13.14 + - git.vdb.to/cerc-io/lighthouse +containers: + - cerc/go-ethereum + - cerc/lighthouse + - cerc/lighthouse-cli + - cerc/fixturenet-eth-genesis + - cerc/fixturenet-eth-geth + - cerc/fixturenet-eth-lighthouse +pods: + - fixturenet-eth