199 lines
7.1 KiB
Go
199 lines
7.1 KiB
Go
|
package beaconclient_test
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"net/http"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
. "github.com/onsi/ginkgo/v2"
|
||
|
"github.com/sirupsen/logrus"
|
||
|
|
||
|
// . "github.com/onsi/gomega"
|
||
|
"github.com/r3labs/sse/v2"
|
||
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
|
||
|
)
|
||
|
|
||
|
type Message struct {
|
||
|
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
|
||
|
ReorgMessage beaconclient.ChainReorg // The reorg messsage that will be streamed to the BeaconClient
|
||
|
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
|
||
|
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
|
||
|
BeaconState string // The file path output of an SSZ encoded BeaconState.
|
||
|
SuccessfulDBQuery string // A string that indicates what a query to the DB should output to pass the test.
|
||
|
}
|
||
|
|
||
|
var TestEvents map[string]*Message
|
||
|
|
||
|
var _ = Describe("Capturehead", func() {
|
||
|
TestEvents = map[string]*Message{
|
||
|
"100": {
|
||
|
HeadMessage: beaconclient.Head{
|
||
|
Slot: "100",
|
||
|
Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
|
||
|
State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
|
||
|
CurrentDutyDependentRoot: "",
|
||
|
PreviousDutyDependentRoot: "",
|
||
|
EpochTransition: false,
|
||
|
ExecutionOptimistic: false,
|
||
|
},
|
||
|
TestNotes: "This is a simple, easy to process block.",
|
||
|
SignedBeaconBlock: filepath.Join("data", "100", "signed-beacon-block.ssz"),
|
||
|
BeaconState: filepath.Join("data", "100", "beacon-state.ssz"),
|
||
|
},
|
||
|
"101": {
|
||
|
HeadMessage: beaconclient.Head{
|
||
|
Slot: "101",
|
||
|
Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
|
||
|
State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
|
||
|
CurrentDutyDependentRoot: "",
|
||
|
PreviousDutyDependentRoot: "",
|
||
|
EpochTransition: false,
|
||
|
ExecutionOptimistic: false,
|
||
|
},
|
||
|
TestNotes: "This is a simple, easy to process block.",
|
||
|
SignedBeaconBlock: filepath.Join("data", "101", "signed-beacon-block.ssz"),
|
||
|
BeaconState: filepath.Join("data", "101", "beacon-state.ssz"),
|
||
|
},
|
||
|
}
|
||
|
|
||
|
// We might also want to add an integration test that will actually process a single event, then end.
|
||
|
// This will help us know that our models match that actual data being served from the beacon node.
|
||
|
|
||
|
Describe("Receiving New Head SSE messages", Label("unit"), func() {
|
||
|
Context("Correctly formatted", Label("dry"), func() {
|
||
|
It("Should turn it into a struct successfully.", func() {
|
||
|
server := createSseServer()
|
||
|
logrus.Info("DONE!")
|
||
|
client := sse.NewClient("http://localhost:8080" + beaconclient.BcHeadTopicEndpoint)
|
||
|
|
||
|
logrus.Info("DONE!")
|
||
|
ch := make(chan *sse.Event)
|
||
|
go client.SubscribeChanRaw(ch)
|
||
|
|
||
|
time.Sleep(2 * time.Second)
|
||
|
logrus.Info("DONE!")
|
||
|
sendMessageToStream(server, []byte("hello"))
|
||
|
client.Unsubscribe(ch)
|
||
|
val := <-ch
|
||
|
|
||
|
logrus.Info("DONE!")
|
||
|
logrus.Info(val)
|
||
|
})
|
||
|
})
|
||
|
//Context("A single incorrectly formatted", func() {
|
||
|
// It("Should create an error, maybe also add the projected slot to the knownGaps table......")
|
||
|
//})
|
||
|
//Context("An incorrectly formatted message sandwiched between correctly formatted messages", func() {
|
||
|
// It("Should create an error, maybe also add the projected slot to the knownGaps table......")
|
||
|
//})
|
||
|
})
|
||
|
|
||
|
//Describe("Receiving New Reorg SSE messages", Label("unit"), func() {
|
||
|
// Context("Reorg slot is already in the DB", func() {
|
||
|
// It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.")
|
||
|
// })
|
||
|
// Context("Multiple reorgs have occurred on this slot", func() {
|
||
|
// It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.")
|
||
|
// })
|
||
|
// Context("Reorg slot in not already in the DB", func() {
|
||
|
// It("Should simply have the correct slot in the DB.")
|
||
|
// })
|
||
|
|
||
|
//})
|
||
|
|
||
|
//Describe("Querying SignedBeaconBlock and Beacon State", Label("unit"), func() {
|
||
|
// Context("When the slot is properly served by the beacon node", func() {
|
||
|
// It("Should provide a successful response.")
|
||
|
// })
|
||
|
// Context("When there is a skipped slot", func() {
|
||
|
// It("Should indicate that the slot was skipped")
|
||
|
// // Future use case.
|
||
|
|
||
|
// })
|
||
|
// Context("When the slot is not properly served", func() {
|
||
|
// It("Should return an error, and add the slot to the knownGaps table.")
|
||
|
// })
|
||
|
//})
|
||
|
|
||
|
//Describe("Receiving properly formatted Head SSE events.", Label("unit"), func() {
|
||
|
// Context("In sequential order", func() {
|
||
|
// It("Should write each event to the DB successfully.")
|
||
|
// })
|
||
|
// Context("With gaps in slots", func() {
|
||
|
// It("Should add the slots in between to the knownGaps table")
|
||
|
// })
|
||
|
// Context("With a repeat slot", func() {
|
||
|
// It("Should recognize the reorg and process it.")
|
||
|
// })
|
||
|
// Context("With the previousBlockHash not matching the parentBlockHash", func() {
|
||
|
// It("Should recognize the reorg and add the previous slot to knownGaps table.")
|
||
|
// })
|
||
|
// Context("Out of order", func() {
|
||
|
// It("Not sure what it should do....")
|
||
|
// })
|
||
|
// Context("With a skipped slot", func() {
|
||
|
// It("Should recognize the slot as skipped and continue without error.")
|
||
|
// // Future use case
|
||
|
// })
|
||
|
//})
|
||
|
})
|
||
|
|
||
|
// Create a new Sse.Server.
|
||
|
func createSseServer() *sse.Server {
|
||
|
// server := sse.New()
|
||
|
// server.CreateStream("")
|
||
|
|
||
|
mux := http.NewServeMux()
|
||
|
//mux.HandleFunc(beaconclient.BcHeadTopicEndpoint, func(w http.ResponseWriter, r *http.Request) {
|
||
|
// go func() {
|
||
|
// // Received Browser Disconnection
|
||
|
// <-r.Context().Done()
|
||
|
// println("The client is disconnected here")
|
||
|
// return
|
||
|
// }()
|
||
|
|
||
|
// server.ServeHTTP(w, r)
|
||
|
//})
|
||
|
mux.HandleFunc(beaconclient.BcStateQueryEndpoint, provideState)
|
||
|
mux.HandleFunc(beaconclient.BcBlockQueryEndpoint, provideBlock)
|
||
|
go http.ListenAndServe(":8080", mux)
|
||
|
return server
|
||
|
}
|
||
|
|
||
|
// Send messages to the stream.
|
||
|
func sendMessageToStream(server *sse.Server, data []byte) {
|
||
|
server.Publish("", &sse.Event{
|
||
|
Data: data,
|
||
|
})
|
||
|
logrus.Info("publish complete")
|
||
|
}
|
||
|
|
||
|
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
|
||
|
func provideState(w http.ResponseWriter, req *http.Request) {
|
||
|
path := strings.Split(req.URL.Path, "/")
|
||
|
slot := path[len(path)-1]
|
||
|
slotFile := "data/" + slot + "/beacon-state.ssz"
|
||
|
dat, err := os.ReadFile(slotFile)
|
||
|
if err != nil {
|
||
|
fmt.Fprintf(w, "Can't find the slot file, %s", slotFile)
|
||
|
}
|
||
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||
|
w.Write(dat)
|
||
|
}
|
||
|
|
||
|
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
|
||
|
func provideBlock(w http.ResponseWriter, req *http.Request) {
|
||
|
path := strings.Split(req.URL.Path, "/")
|
||
|
slot := path[len(path)-1]
|
||
|
slotFile := "data/" + slot + "/signed-beacon-block.ssz"
|
||
|
dat, err := os.ReadFile(slotFile)
|
||
|
if err != nil {
|
||
|
fmt.Fprintf(w, "Can't find the slot file, %s", slotFile)
|
||
|
}
|
||
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||
|
w.Write(dat)
|
||
|
}
|