diff --git a/libraries/shared/streamer/statediff_streamer.go b/libraries/shared/streamer/statediff_streamer.go new file mode 100644 index 00000000..49ea45f8 --- /dev/null +++ b/libraries/shared/streamer/statediff_streamer.go @@ -0,0 +1,39 @@ +// Copyright 2019 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package streamer + +import ( + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" + "github.com/vulcanize/vulcanizedb/pkg/core" +) + +type Streamer interface { + Stream(chan statediff.Payload) (*rpc.ClientSubscription, error) +} + +type StateDiffStreamer struct { + client core.RpcClient +} + +func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { + return streamer.client.Subscribe("statediff", payloadChan, "stream") +} + +func NewStateDiffStreamer(client core.RpcClient) StateDiffStreamer { + return StateDiffStreamer{ + client: client, + } +} diff --git a/libraries/shared/streamer/statediff_streamer_test.go b/libraries/shared/streamer/statediff_streamer_test.go new file mode 100644 index 00000000..97428eb8 --- /dev/null +++ b/libraries/shared/streamer/statediff_streamer_test.go @@ -0,0 +1,36 @@ +// Copyright 2019 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package streamer_test + +import ( + "github.com/ethereum/go-ethereum/statediff" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/fakes" +) + +var _ = Describe("StateDiff Streamer", func() { + It("subscribes to the geth statediff service", func() { + client := &fakes.MockRpcClient{} + streamer := streamer.NewStateDiffStreamer(client) + payloadChan := make(chan statediff.Payload) + _, err := streamer.Stream(payloadChan) + Expect(err).NotTo(HaveOccurred()) + + client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream"}) + }) +}) + diff --git a/libraries/shared/streamer/streamer_suite_test.go b/libraries/shared/streamer/streamer_suite_test.go new file mode 100644 index 00000000..2a3253e6 --- /dev/null +++ b/libraries/shared/streamer/streamer_suite_test.go @@ -0,0 +1,13 @@ +package streamer_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestStreamer(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Streamer Suite") +}