diff --git a/cmd/capture.go b/cmd/capture.go index d81272d..32113a8 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -21,6 +21,7 @@ var ( dbPort int bcAddress string bcPort int + bcConnectionProtocol string maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second ) @@ -62,6 +63,7 @@ func init() { //// Beacon Client Specific captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)") captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)") + captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") err = captureCmd.MarkPersistentFlagRequired("bc.address") exitErr(err) err = captureCmd.MarkPersistentFlagRequired("bc.port") @@ -87,6 +89,8 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port")) exitErr(err) + err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol")) + exitErr(err) // Here you will define your flags and configuration settings. } diff --git a/cmd/head.go b/cmd/head.go index db98242..ca7e7e3 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -31,7 +31,7 @@ func startHeadTracking() { log.Info("Starting the application in head tracking mode.") ctx := context.Background() - BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) + BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) if err != nil { loghelper.LogError(err).Error("Unable to Start application") } diff --git a/go.mod b/go.mod index e8829d7..37e51b7 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/jackc/puddle v1.2.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/lib/pq v1.10.4 // indirect + github.com/minio/sha256-simd v0.1.1 // indirect github.com/pkg/errors v0.9.1 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect @@ -27,6 +28,7 @@ require ( ) require ( + github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747 github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/georgysavva/scany v0.3.0 github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/go.sum b/go.sum index 0b56ef7..0ffd6c1 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747 h1:K2Bt7NSX8x/5MD2RiO7cPLy21dBgnQ84r9uR0QYoHrE= +github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747/go.mod h1:S8yiDeAXy8f88W4Ul+0dBMPx49S05byYbmZD6Uv94K4= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= @@ -112,6 +114,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -251,6 +255,9 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= +github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/internal/boot/boot.go b/internal/boot/boot.go index 3c71b7b..ce9d61a 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -56,11 +56,11 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st // // 2. Connect to the database. // -func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (*beaconclient.BeaconClient, sql.Database, error) { +func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) { log.Info("Booting the Application") log.Debug("Creating the Beacon Client") - BC = beaconclient.CreateBeaconClient(ctx, bcAddress, bcPort) + BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort) log.Debug("Checking Beacon Client") err := BC.CheckBeaconClient() @@ -77,10 +77,10 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName } // Add retry logic to ensure that we are give the Beacon Client and the DB time to start. -func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (*beaconclient.BeaconClient, sql.Database, error) { +func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) { var err error for i := 0; i < maxRetry; i++ { - BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort) + BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index 9db307e..3cbf71c 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -10,38 +10,39 @@ import ( var _ = Describe("Boot", func() { var ( - dbAddress string = "localhost" - dbPort int = 8077 - dbName string = "vulcanize_testing" - dbUsername string = "vdbm" - dbPassword string = "password" - dbDriver string = "PGX" - bcAddress string = "localhost" - bcPort int = 5052 + dbAddress string = "localhost" + dbPort int = 8077 + dbName string = "vulcanize_testing" + dbUsername string = "vdbm" + dbPassword string = "password" + dbDriver string = "PGX" + bcAddress string = "localhost" + bcPort int = 5052 + bcConnectionProtocol string = "http" ) Describe("Booting the application", Label("integration"), func() { Context("When the DB and BC are both up and running", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) defer db.Close() Expect(err).To(BeNil()) }) }) Context("When the DB is running but not the BC", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100) + _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol) Expect(err).ToNot(BeNil()) }) }) Context("When the BC is running but not the DB", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) Expect(err).ToNot(BeNil()) }) }) Context("When neither the BC or DB are running", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol) Expect(err).ToNot(BeNil()) }) }) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 8704220..44fce6c 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -13,14 +13,14 @@ var ( bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain - connectionProtocol = "http" + bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks + bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States ) // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. type BeaconClient struct { Context context.Context // A context generic context with multiple uses. - ServerAddress string // Address of the Beacon Server - ServerPort int // Port of the Beacon Server + ServerEndpoint string // What is the endpoint of the beacon server. PerformHeadTracking bool // Should we track head? PerformHistoricalProcessing bool // Should we perform historical processing? HeadTracking *SseEvents[Head] // Track the head block @@ -30,7 +30,7 @@ type BeaconClient struct { // A struct to keep track of relevant the head event topic. type SseEvents[P ProcessedEvents] struct { - Url string // The url for the subscription. Primarily used for logging + Endpoint string // The endpoint for the subscription. Primarily used for logging MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel ErrorCh chan *SseError // Contains any errors while SSE streaming occurred ProcessCh chan *P // Used to capture processed data in its proper struct. @@ -44,30 +44,30 @@ type SseError struct { } // A Function to create the BeaconClient. -func CreateBeaconClient(ctx context.Context, bcAddress string, bcPort int) *BeaconClient { +func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int) *BeaconClient { + endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) log.Info("Creating the BeaconClient") return &BeaconClient{ Context: ctx, - ServerAddress: bcAddress, - ServerPort: bcPort, - HeadTracking: createSseEvent[Head](connectionProtocol, bcAddress, bcPort, bcHeadTopicEndpoint), - ReOrgTracking: createSseEvent[ChainReorg](connectionProtocol, bcAddress, bcPort, bcReorgTopicEndpoint), - FinalizationTracking: createSseEvent[FinalizedCheckpoint](connectionProtocol, bcAddress, bcPort, bcFinalizedTopicEndpoint), + ServerEndpoint: endpoint, + HeadTracking: createSseEvent[Head](endpoint, bcHeadTopicEndpoint), + ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), + FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), } } // Create all the channels to handle a SSE events -func createSseEvent[P ProcessedEvents](connectionProtocol, bcAddress string, bcPort int, endpoint string) *SseEvents[P] { - url := fmt.Sprintf("%s://%s:%d%s", connectionProtocol, bcAddress, bcPort, endpoint) +func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEvents[P] { + endpoint := baseEndpoint + path sseEvents := &SseEvents[P]{ - Url: url, + Endpoint: endpoint, MessagesCh: make(chan *sse.Event), ErrorCh: make(chan *SseError), ProcessCh: make(chan *P), - SseClient: func(url string) *sse.Client { - log.WithFields(log.Fields{"url": url}).Info("Creating SSE client") - return sse.NewClient(url) - }(url), + SseClient: func(endpoint string) *sse.Client { + log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") + return sse.NewClient(endpoint) + }(endpoint), } return sseEvents } diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index d1ca6a1..a300175 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -1,8 +1,11 @@ +// This file will call all the functions to start and stop capturing the head of the beacon chain. + package beaconclient import ( "time" + "github.com/ferranbt/fastssz/spectests" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) @@ -10,10 +13,40 @@ import ( // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) CaptureHead() { log.Info("We are tracking the head of the chain.") - go bc.handleHead() - go bc.handleFinalizedCheckpoint() - go bc.handleReorgs() - bc.captureEventTopic() + bc.tempHelper() + // go bc.handleHead() + // go bc.handleFinalizedCheckpoint() + // go bc.handleReorgs() + // bc.captureEventTopic() +} + +// A temporary helper function to see the output of beacon block and states. +func (bc *BeaconClient) tempHelper() { + slot := "3200" + blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot + stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot + // Query + log.Info("Get") + blockSsz, _ := querySsz(blockEndpoint, slot) + stateSsz, _ := querySsz(stateEndpoint, slot) + // Transform + log.Info("Tranform") + stateObj := new(spectests.BeaconState) + err := stateObj.UnmarshalSSZ(stateSsz) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") + } + + blockObj := new(spectests.SignedBeaconBlock) + err = blockObj.UnmarshalSSZ(blockSsz) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") + } + + // Check + log.Info("Check") + log.Info("State Slot: ", stateObj.Slot) + log.Info("Block Slot: ", blockObj.Block.Slot) } // Stop the head tracking service. @@ -36,11 +69,11 @@ func (bc *BeaconClient) StopHeadTracking() error { // This function closes the SSE subscription, but waits until the MessagesCh is empty func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { - loghelper.LogUrl(se.Url).Info("Received a close event.") + loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.") se.SseClient.Unsubscribe(se.MessagesCh) for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 { time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) } - loghelper.LogUrl(se.Url).Info("Done processing all messages, ready for shutdown") + loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown") finish <- true } diff --git a/pkg/beaconclient/healthcheck.go b/pkg/beaconclient/healthcheck.go index 342ccd1..c7f3fcd 100644 --- a/pkg/beaconclient/healthcheck.go +++ b/pkg/beaconclient/healthcheck.go @@ -3,7 +3,6 @@ package beaconclient import ( "fmt" "net/http" - "strconv" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" @@ -15,7 +14,7 @@ import ( // to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security func (bc BeaconClient) CheckBeaconClient() error { log.Debug("Attempting to connect to the beacon client") - bcEndpoint := "http://" + bc.ServerAddress + ":" + strconv.Itoa(bc.ServerPort) + bcHealthEndpoint + bcEndpoint := bc.ServerEndpoint + bcHealthEndpoint resp, err := http.Get(bcEndpoint) if err != nil { loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint) diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go index cc255dc..88aab7e 100644 --- a/pkg/beaconclient/healthcheck_test.go +++ b/pkg/beaconclient/healthcheck_test.go @@ -10,8 +10,8 @@ import ( var _ = Describe("Healthcheck", func() { var ( - BC = beaconclient.CreateBeaconClient(context.Background(), "localhost", 5052) - errBc = beaconclient.CreateBeaconClient(context.Background(), "blah-blah", 1010) + BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052) + errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010) ) Describe("Connecting to the lighthouse client", Label("integration"), func() { Context("When the client is running", func() { diff --git a/pkg/beaconclient/ssehandler.go b/pkg/beaconclient/incomingsse.go similarity index 86% rename from pkg/beaconclient/ssehandler.go rename to pkg/beaconclient/incomingsse.go index ac9d01f..73f3382 100644 --- a/pkg/beaconclient/ssehandler.go +++ b/pkg/beaconclient/incomingsse.go @@ -18,7 +18,7 @@ var ( // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { - loghelper.LogUrl(eventHandler.Url).Info("Subscribing to Messages") + loghelper.LogEndpoint(eventHandler.Endpoint).Info("Subscribing to Messages") go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) for { select { @@ -30,9 +30,9 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { case headErr := <-eventHandler.ErrorCh: log.WithFields(log.Fields{ - "url": eventHandler.Url, - "err": headErr.err, - "msg": headErr.msg, + "endpoint": eventHandler.Endpoint, + "err": headErr.err, + "msg": headErr.msg, }, ).Error("Unable to handle event.") @@ -44,7 +44,6 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { // Turn the data object into a Struct. func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) { - log.WithFields(log.Fields{"msg": msg}).Info("Processing a Message") var msgMarshaled P err := json.Unmarshal(msg, &msgMarshaled) if err != nil { @@ -55,7 +54,6 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan } return } - log.WithFields(log.Fields{"process": processCh}).Info("Processed") processCh <- &msgMarshaled } diff --git a/pkg/beaconclient/handleevents.go b/pkg/beaconclient/processevents.go similarity index 90% rename from pkg/beaconclient/handleevents.go rename to pkg/beaconclient/processevents.go index 08f715e..0b5b821 100644 --- a/pkg/beaconclient/handleevents.go +++ b/pkg/beaconclient/processevents.go @@ -1,3 +1,6 @@ +// This file contains all the functions to handle SSE events after they have been turned +// to the structs. + package beaconclient import log "github.com/sirupsen/logrus" diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go new file mode 100644 index 0000000..14dd0a1 --- /dev/null +++ b/pkg/beaconclient/queryserver.go @@ -0,0 +1,101 @@ +// This file will contain functions to query the Beacon Chain Server. + +package beaconclient + +import ( + "fmt" + "io/ioutil" + "net/http" + + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +// Attempt to use generics.. +// // These are types that append slot at the end of the URL to handle a request. +// type SlotBasedRequests interface { +// *specs.BeaconState | *specs.SignedBeaconBlock +// UnmarshalSSZ([]byte) error +// } +// +// func queryState[R SlotBasedRequests](endpoint string, slot string) (R, error) { +// obj := new(R) +// rawState, err := querySlot(endpoint, slot) +// if err != nil { +// return *obj, err +// } +// +// err = &obj.UnmarshalSSZ(rawState) +// err = (*obj).UnmarshalSSZ(rawState) +// if err != nil { +// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") +// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error()) +// } +// return *obj, nil +// } + +// This function will query a state object based on the slot provided. +// The object is SSZ encoded. + +//type BeaconBlockResponse struct { +// version string `json: ` +//} + +// func queryState(endpoint string, slot string) (spectests.BeaconState, error) { +// obj := new(spectests.BeaconState) +// fullEndpoint := endpoint + slot +// rawState, err := querySsz(fullEndpoint, slot) +// if err != nil { +// return *obj, err +// } +// +// err = obj.UnmarshalSSZ(rawState) +// if err != nil { +// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node") +// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node: %s", err.Error()) +// } +// return *obj, nil +// } +// +// // This function will query a state object based on the slot provided. +// // The object is SSZ encoded. +// func queryBlock(endpoint string, slot string) (spectests.SignedBeaconBlock, error) { +// obj := new(spectests.SignedBeaconBlock) +// fullEndpoint := endpoint + slot +// rawBlock, err := querySsz(fullEndpoint, slot) +// if err != nil { +// return *obj, err +// } +// +// err = obj.UnmarshalSSZ(rawBlock) +// if err != nil { +// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") +// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error()) +// } +// return *obj, nil +// } + +// A helper function to query endpoints that utilize slots. +func querySsz(endpoint string, slot string) ([]byte, error) { + log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint") + client := &http.Client{} + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to create a request!") + return nil, fmt.Errorf("Unable to create a request!: %s", err.Error()) + } + // Not set correctly + req.Header.Set("Accept", "application/octet-stream") + response, err := client.Do(req) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!") + return nil, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) + } + defer response.Body.Close() + body, err := ioutil.ReadAll(response.Body) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") + return nil, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) + } + return body, nil +} diff --git a/pkg/loghelper/log_url.go b/pkg/loghelper/log_endpoint.go similarity index 70% rename from pkg/loghelper/log_url.go rename to pkg/loghelper/log_endpoint.go index 4a7d781..809bf62 100644 --- a/pkg/loghelper/log_url.go +++ b/pkg/loghelper/log_endpoint.go @@ -5,8 +5,8 @@ import ( ) // A simple helper function that will help wrap the error message. -func LogUrl(url string) *log.Entry { +func LogEndpoint(endpoint string) *log.Entry { return log.WithFields(log.Fields{ - "url": url, + "endpoint": endpoint, }) } diff --git a/pkg/loghelper/log_error.go b/pkg/loghelper/log_error.go index 41d0149..66305dd 100644 --- a/pkg/loghelper/log_error.go +++ b/pkg/loghelper/log_error.go @@ -11,3 +11,10 @@ func LogError(err error) *log.Entry { "err": err, }) } + +func LogSlotError(slot string, err error) *log.Entry { + return log.WithFields(log.Fields{ + "err": err, + "slot": slot, + }) +} diff --git a/tmp/ci/state b/tmp/ci/state new file mode 100644 index 0000000..c9fb317 Binary files /dev/null and b/tmp/ci/state differ diff --git a/tmp/code/block b/tmp/code/block new file mode 100644 index 0000000..34bea26 Binary files /dev/null and b/tmp/code/block differ diff --git a/tmp/code/state b/tmp/code/state new file mode 100644 index 0000000..c9fb317 Binary files /dev/null and b/tmp/code/state differ