Feature/22 test handling incoming events #30
9
.github/workflows/on-pr.yml
vendored
9
.github/workflows/on-pr.yml
vendored
@ -22,7 +22,7 @@ on:
|
|||||||
|
|
||||||
env:
|
env:
|
||||||
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '21d076268730e3f25fcec6371c1aca1bf48040d8'}}
|
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '21d076268730e3f25fcec6371c1aca1bf48040d8'}}
|
||||||
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '05600e51d2163e1c5e2a872cb54606bc0a380d12' }}
|
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'minimal-beacon-chain-schema' }}
|
||||||
GOPATH: /tmp/go
|
GOPATH: /tmp/go
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
@ -102,6 +102,13 @@ jobs:
|
|||||||
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
|
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
|
||||||
cat ./config.sh
|
cat ./config.sh
|
||||||
|
|
||||||
|
- name: Run docker compose
|
||||||
|
run: |
|
||||||
|
docker-compose \
|
||||||
|
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db.yml" \
|
||||||
|
--env-file ./config.sh \
|
||||||
|
up -d --build
|
||||||
|
|
||||||
- uses: actions/setup-go@v3
|
- uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
go-version: ">=1.17.0"
|
go-version: ">=1.17.0"
|
||||||
|
1
.gitignore
vendored
1
.gitignore
vendored
@ -4,3 +4,4 @@ ipld-ethcl-indexer.log
|
|||||||
report.json
|
report.json
|
||||||
cover.profile
|
cover.profile
|
||||||
temp/*
|
temp/*
|
||||||
|
ipld-ethcl-indexer/pkg/beaconclient/data/*
|
@ -58,9 +58,16 @@ This project utilizes `ginkgo` for testing. A few notes on testing:
|
|||||||
- All test packages are named `{base_package}_test`. This ensures we only test the public methods.
|
- All test packages are named `{base_package}_test`. This ensures we only test the public methods.
|
||||||
- If there is a need to test a private method, please include why in the testing file.
|
- If there is a need to test a private method, please include why in the testing file.
|
||||||
- Unit tests must contain the `Label("unit")`.
|
- Unit tests must contain the `Label("unit")`.
|
||||||
- Unit tests should not rely on any running service. If a running service is needed. Utilize an integration test.
|
- Unit tests should not rely on any running service (except for a postgres DB). If a running service is needed. Utilize an integration test.
|
||||||
- Integration tests must contain the `Label("integration")`.
|
- Integration tests must contain the `Label("integration")`.
|
||||||
|
|
||||||
|
### Testing the `pkg/beaconclient`
|
||||||
|
|
||||||
|
To test the `/pkg/beaconclient`, you will need to download data locally.
|
||||||
|
|
||||||
|
1. [Install Minio](https://docs.min.io/minio/baremetal/quickstart/quickstart.html), you only need the client, `mc`.
|
||||||
|
2. Run: `mc cp`
|
||||||
|
|
||||||
# Contribution
|
# Contribution
|
||||||
|
|
||||||
If you want to contribute please make sure you do the following:
|
If you want to contribute please make sure you do the following:
|
||||||
|
@ -25,3 +25,15 @@ This package will contain code to interact with the beacon client.
|
|||||||
## `pkg/version`
|
## `pkg/version`
|
||||||
|
|
||||||
A generic package which can be utilized to easily version our applications.
|
A generic package which can be utilized to easily version our applications.
|
||||||
|
|
||||||
|
## `pkg/gracefulshutdown`
|
||||||
|
|
||||||
|
A generic package that can be used to shutdown various services within an application.
|
||||||
|
|
||||||
|
## `pkg/loghelper`
|
||||||
|
|
||||||
|
This package contains useful functions for logging.
|
||||||
|
|
||||||
|
## `internal/shutdown`
|
||||||
|
|
||||||
|
This package is used to shutdown the `ipld-ethcl-indexer`. It calls the `pkg/gracefulshutdown` package.
|
||||||
|
1
go.mod
1
go.mod
@ -43,6 +43,7 @@ require (
|
|||||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||||
github.com/jackc/pgx/v4 v4.16.0
|
github.com/jackc/pgx/v4 v4.16.0
|
||||||
|
github.com/jarcoal/httpmock v1.2.0
|
||||||
github.com/julienschmidt/httprouter v1.3.0
|
github.com/julienschmidt/httprouter v1.3.0
|
||||||
github.com/magiconair/properties v1.8.6 // indirect
|
github.com/magiconair/properties v1.8.6 // indirect
|
||||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||||
|
2
go.sum
2
go.sum
@ -346,6 +346,8 @@ github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv
|
|||||||
github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw=
|
github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw=
|
||||||
github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
|
github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
|
||||||
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
|
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
|
||||||
|
github.com/jarcoal/httpmock v1.2.0 h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc=
|
||||||
|
github.com/jarcoal/httpmock v1.2.0/go.mod h1:oCoTsnAz4+UoOUIf5lJOWV2QQIW5UoeUI6aM2YnWAZk=
|
||||||
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
||||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||||
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||||
|
@ -1,19 +1,24 @@
|
|||||||
package beaconclient_test
|
package beaconclient_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/jarcoal/httpmock"
|
||||||
. "github.com/onsi/ginkgo/v2"
|
. "github.com/onsi/ginkgo/v2"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/r3labs/sse"
|
||||||
|
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
// . "github.com/onsi/gomega"
|
|
||||||
"github.com/r3labs/sse/v2"
|
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
||||||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
@ -22,65 +27,83 @@ type Message struct {
|
|||||||
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
|
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
|
||||||
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
|
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
|
||||||
BeaconState string // The file path output of an SSZ encoded BeaconState.
|
BeaconState string // The file path output of an SSZ encoded BeaconState.
|
||||||
SuccessfulDBQuery string // A string that indicates what a query to the DB should output to pass the test.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var TestEvents map[string]*Message
|
|
||||||
|
|
||||||
var _ = Describe("Capturehead", func() {
|
var _ = Describe("Capturehead", func() {
|
||||||
TestEvents = map[string]*Message{
|
|
||||||
"100": {
|
var (
|
||||||
HeadMessage: beaconclient.Head{
|
BC beaconclient.BeaconClient
|
||||||
Slot: "100",
|
DB sql.Database
|
||||||
Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
|
err error
|
||||||
State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
|
address string = "localhost"
|
||||||
CurrentDutyDependentRoot: "",
|
port int = 8080
|
||||||
PreviousDutyDependentRoot: "",
|
protocol string = "http"
|
||||||
EpochTransition: false,
|
TestEvents map[string]*Message
|
||||||
ExecutionOptimistic: false,
|
dbHost string = "localhost"
|
||||||
|
dbPort int = 8077
|
||||||
|
dbName string = "vulcanize_testing"
|
||||||
|
dbUser string = "vdbm"
|
||||||
|
dbPassword string = "password"
|
||||||
|
dbDriver string = "pgx"
|
||||||
|
)
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
TestEvents = map[string]*Message{
|
||||||
|
"100": {
|
||||||
|
HeadMessage: beaconclient.Head{
|
||||||
|
Slot: "100",
|
||||||
|
Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
|
||||||
|
State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
|
||||||
|
CurrentDutyDependentRoot: "",
|
||||||
|
PreviousDutyDependentRoot: "",
|
||||||
|
EpochTransition: false,
|
||||||
|
ExecutionOptimistic: false,
|
||||||
|
},
|
||||||
|
TestNotes: "This is a simple, easy to process block.",
|
||||||
|
SignedBeaconBlock: filepath.Join("data", "100", "signed-beacon-block.ssz"),
|
||||||
|
BeaconState: filepath.Join("data", "100", "beacon-state.ssz"),
|
||||||
},
|
},
|
||||||
TestNotes: "This is a simple, easy to process block.",
|
"101": {
|
||||||
SignedBeaconBlock: filepath.Join("data", "100", "signed-beacon-block.ssz"),
|
HeadMessage: beaconclient.Head{
|
||||||
BeaconState: filepath.Join("data", "100", "beacon-state.ssz"),
|
Slot: "101",
|
||||||
},
|
Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
|
||||||
"101": {
|
State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
|
||||||
HeadMessage: beaconclient.Head{
|
CurrentDutyDependentRoot: "",
|
||||||
Slot: "101",
|
PreviousDutyDependentRoot: "",
|
||||||
Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
|
EpochTransition: false,
|
||||||
State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
|
ExecutionOptimistic: false,
|
||||||
CurrentDutyDependentRoot: "",
|
},
|
||||||
PreviousDutyDependentRoot: "",
|
TestNotes: "This is a simple, easy to process block.",
|
||||||
EpochTransition: false,
|
SignedBeaconBlock: filepath.Join("data", "101", "signed-beacon-block.ssz"),
|
||||||
ExecutionOptimistic: false,
|
BeaconState: filepath.Join("data", "101", "beacon-state.ssz"),
|
||||||
},
|
},
|
||||||
TestNotes: "This is a simple, easy to process block.",
|
}
|
||||||
SignedBeaconBlock: filepath.Join("data", "101", "signed-beacon-block.ssz"),
|
SetupBeaconNodeMock(TestEvents, protocol, address, port)
|
||||||
BeaconState: filepath.Join("data", "101", "beacon-state.ssz"),
|
BC = *beaconclient.CreateBeaconClient(context.Background(), protocol, address, port)
|
||||||
},
|
DB, err = postgres.SetupPostgresDb(dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
|
||||||
}
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
clearEthclDbTables(DB)
|
||||||
|
// Drop all records from the DB.
|
||||||
|
BC.Db = DB
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
// We might also want to add an integration test that will actually process a single event, then end.
|
// We might also want to add an integration test that will actually process a single event, then end.
|
||||||
// This will help us know that our models match that actual data being served from the beacon node.
|
// This will help us know that our models match that actual data being served from the beacon node.
|
||||||
|
|
||||||
Describe("Receiving New Head SSE messages", Label("unit"), func() {
|
Describe("Receiving New Head SSE messages", Label("unit"), func() {
|
||||||
Context("Correctly formatted", Label("dry"), func() {
|
Context("Correctly formatted", func() {
|
||||||
It("Should turn it into a struct successfully.", func() {
|
It("Should turn it into a struct successfully.", func() {
|
||||||
server := createSseServer()
|
go BC.CaptureHead()
|
||||||
logrus.Info("DONE!")
|
sendHeadMessage(BC, TestEvents["100"].HeadMessage)
|
||||||
client := sse.NewClient("http://localhost:8080" + beaconclient.BcHeadTopicEndpoint)
|
|
||||||
|
|
||||||
logrus.Info("DONE!")
|
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(DB, TestEvents["100"].HeadMessage.Slot, TestEvents["100"].HeadMessage.Block)
|
||||||
ch := make(chan *sse.Event)
|
Expect(slot).To(Equal(100))
|
||||||
go client.SubscribeChanRaw(ch)
|
Expect(epoch).To(Equal(3))
|
||||||
|
Expect(blockRoot).To(Equal(TestEvents["100"].HeadMessage.Block))
|
||||||
|
Expect(stateRoot).To(Equal(TestEvents["100"].HeadMessage.State))
|
||||||
|
Expect(status).To(Equal("proposed"))
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
logrus.Info("DONE!")
|
|
||||||
sendMessageToStream(server, []byte("hello"))
|
|
||||||
client.Unsubscribe(ch)
|
|
||||||
val := <-ch
|
|
||||||
|
|
||||||
logrus.Info("DONE!")
|
|
||||||
logrus.Info(val)
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
//Context("A single incorrectly formatted", func() {
|
//Context("A single incorrectly formatted", func() {
|
||||||
@ -141,58 +164,100 @@ var _ = Describe("Capturehead", func() {
|
|||||||
//})
|
//})
|
||||||
})
|
})
|
||||||
|
|
||||||
// Create a new Sse.Server.
|
// Wrapper function to send a head message to the beaconclient
|
||||||
func createSseServer() *sse.Server {
|
func sendHeadMessage(bc beaconclient.BeaconClient, head beaconclient.Head) {
|
||||||
// server := sse.New()
|
|
||||||
// server.CreateStream("")
|
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
data, err := json.Marshal(head)
|
||||||
//mux.HandleFunc(beaconclient.BcHeadTopicEndpoint, func(w http.ResponseWriter, r *http.Request) {
|
Expect(err).ToNot(HaveOccurred())
|
||||||
// go func() {
|
|
||||||
// // Received Browser Disconnection
|
|
||||||
// <-r.Context().Done()
|
|
||||||
// println("The client is disconnected here")
|
|
||||||
// return
|
|
||||||
// }()
|
|
||||||
|
|
||||||
// server.ServeHTTP(w, r)
|
bc.HeadTracking.MessagesCh <- &sse.Event{
|
||||||
//})
|
ID: []byte{},
|
||||||
mux.HandleFunc(beaconclient.BcStateQueryEndpoint, provideState)
|
Data: data,
|
||||||
mux.HandleFunc(beaconclient.BcBlockQueryEndpoint, provideBlock)
|
Event: []byte{},
|
||||||
go http.ListenAndServe(":8080", mux)
|
Retry: []byte{},
|
||||||
return server
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send messages to the stream.
|
// A helper function to query the ethcl.slots table based on the slot and block_root
|
||||||
func sendMessageToStream(server *sse.Server, data []byte) {
|
func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (int, int, string, string, string) {
|
||||||
server.Publish("", &sse.Event{
|
sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM ethcl.slots WHERE slot=$1 AND block_root=$2;`
|
||||||
Data: data,
|
var epoch, slot int
|
||||||
})
|
var blockRoot, stateRoot, status string
|
||||||
logrus.Info("publish complete")
|
row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot)
|
||||||
|
err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
return epoch, slot, blockRoot, stateRoot, status
|
||||||
|
}
|
||||||
|
|
||||||
|
// A function that will remove all entries from the ethcl tables for you.
|
||||||
|
func clearEthclDbTables(db sql.Database) {
|
||||||
|
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.batch_processing;"}
|
||||||
|
for _, queries := range deleteQueries {
|
||||||
|
_, err := db.Exec(context.Background(), queries)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestBeaconNode struct {
|
||||||
|
TestEvents map[string]*Message
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new new mock for the beacon node.
|
||||||
|
func SetupBeaconNodeMock(TestEvents map[string]*Message, protocol string, address string, port int) {
|
||||||
|
httpmock.Activate()
|
||||||
|
testBC := &TestBeaconNode{
|
||||||
|
TestEvents: TestEvents,
|
||||||
|
}
|
||||||
|
|
||||||
|
stateUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcStateQueryEndpoint + `([^/]+)\z`
|
||||||
|
httpmock.RegisterResponder("GET", stateUrl,
|
||||||
|
func(req *http.Request) (*http.Response, error) {
|
||||||
|
// Get ID from request
|
||||||
|
id := httpmock.MustGetSubmatch(req, 1)
|
||||||
|
dat, err := testBC.provideSsz(id, "state")
|
||||||
|
if err != nil {
|
||||||
|
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
|
||||||
|
}
|
||||||
|
return httpmock.NewBytesResponse(200, dat), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
blockUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcBlockQueryEndpoint + `([^/]+)\z`
|
||||||
|
httpmock.RegisterResponder("GET", blockUrl,
|
||||||
|
func(req *http.Request) (*http.Response, error) {
|
||||||
|
// Get ID from request
|
||||||
|
id := httpmock.MustGetSubmatch(req, 1)
|
||||||
|
dat, err := testBC.provideSsz(id, "block")
|
||||||
|
if err != nil {
|
||||||
|
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
|
||||||
|
}
|
||||||
|
return httpmock.NewBytesResponse(200, dat), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
|
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
|
||||||
func provideState(w http.ResponseWriter, req *http.Request) {
|
func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string) ([]byte, error) {
|
||||||
path := strings.Split(req.URL.Path, "/")
|
var slotFile string
|
||||||
slot := path[len(path)-1]
|
for _, val := range tbc.TestEvents {
|
||||||
slotFile := "data/" + slot + "/beacon-state.ssz"
|
if sszIdentifier == "state" {
|
||||||
|
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.State == slotIdentifier {
|
||||||
|
slotFile = val.BeaconState
|
||||||
|
}
|
||||||
|
} else if sszIdentifier == "block" {
|
||||||
|
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.Block == slotIdentifier {
|
||||||
|
slotFile = val.SignedBeaconBlock
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if slotFile == "" {
|
||||||
|
return nil, fmt.Errorf("We couldn't find the slot file for %s", slotIdentifier)
|
||||||
|
}
|
||||||
dat, err := os.ReadFile(slotFile)
|
dat, err := os.ReadFile(slotFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintf(w, "Can't find the slot file, %s", slotFile)
|
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)
|
||||||
}
|
}
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
return dat, nil
|
||||||
w.Write(dat)
|
|
||||||
}
|
|
||||||
|
|
||||||
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
|
|
||||||
func provideBlock(w http.ResponseWriter, req *http.Request) {
|
|
||||||
path := strings.Split(req.URL.Path, "/")
|
|
||||||
slot := path[len(path)-1]
|
|
||||||
slotFile := "data/" + slot + "/signed-beacon-block.ssz"
|
|
||||||
dat, err := os.ReadFile(slotFile)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintf(w, "Can't find the slot file, %s", slotFile)
|
|
||||||
}
|
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
|
||||||
w.Write(dat)
|
|
||||||
}
|
}
|
||||||
|
@ -19,23 +19,26 @@ var (
|
|||||||
// When new messages come in, it will ensure that they are decoded into JSON.
|
// When new messages come in, it will ensure that they are decoded into JSON.
|
||||||
// If any errors occur, it log the error information.
|
// If any errors occur, it log the error information.
|
||||||
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
|
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
|
||||||
errG := new(errgroup.Group)
|
go func() {
|
||||||
errG.Go(func() error {
|
errG := new(errgroup.Group)
|
||||||
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
|
errG.Go(func() error {
|
||||||
if err != nil {
|
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err := errG.Wait(); err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"err": err,
|
||||||
|
"endpoint": eventHandler.Endpoint,
|
||||||
|
}).Error("Unable to subscribe to the SSE endpoint.")
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
})
|
}()
|
||||||
if err := errG.Wait(); err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"err": err,
|
|
||||||
"endpoint": eventHandler.Endpoint,
|
|
||||||
}).Error("Unable to subscribe to the SSE endpoint.")
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
|
|
||||||
}
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case message := <-eventHandler.MessagesCh:
|
case message := <-eventHandler.MessagesCh:
|
||||||
|
Loading…
Reference in New Issue
Block a user