diff --git a/pkg/ipfs/api.go b/pkg/ipfs/api.go
new file mode 100644
index 00000000..858f8442
--- /dev/null
+++ b/pkg/ipfs/api.go
@@ -0,0 +1,82 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package ipfs
+
+import (
+ "context"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// APIName is the namespace used for the state diffing service API
+const APIName = "vulcanizedb"
+
+// APIVersion is the version of the state diffing service API
+const APIVersion = "0.0.1"
+
+// PublicSeedNodeAPI
+type PublicSeedNodeAPI struct {
+ snp SyncPublishAndServe
+}
+
+// NewPublicSeedNodeAPI
+func NewPublicSeedNodeAPI(snp SyncPublishAndServe) *PublicSeedNodeAPI {
+ return &PublicSeedNodeAPI{
+ snp: snp,
+ }
+}
+
+// Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created
+func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, params *Params) (*rpc.Subscription, error) {
+ // ensure that the RPC connection supports subscriptions
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return nil, rpc.ErrNotificationsUnsupported
+ }
+
+ // create subscription and start waiting for statediff events
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ // subscribe to events from the state diff service
+ payloadChannel := make(chan ResponsePayload)
+ quitChan := make(chan bool)
+ api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, params)
+
+ // loop and await state diff payloads and relay them to the subscriber with then notifier
+ for {
+ select {
+ case packet := <-payloadChannel:
+ if err := notifier.Notify(rpcSub.ID, packet); err != nil {
+ log.Error("Failed to send state diff packet", "err", err)
+ }
+ case <-rpcSub.Err():
+ err := api.snp.Unsubscribe(rpcSub.ID)
+ if err != nil {
+ log.Error("Failed to unsubscribe from the state diff service", err)
+ }
+ return
+ case <-quitChan:
+ // don't need to unsubscribe, statediff service does so before sending the quit signal
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
diff --git a/pkg/ipfs/processor.go b/pkg/ipfs/processor.go
deleted file mode 100644
index 9678d73c..00000000
--- a/pkg/ipfs/processor.go
+++ /dev/null
@@ -1,109 +0,0 @@
-// VulcanizeDB
-// Copyright © 2019 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package ipfs
-
-import (
- "sync"
-
- "github.com/ethereum/go-ethereum/statediff"
- log "github.com/sirupsen/logrus"
-
- "github.com/vulcanize/vulcanizedb/pkg/core"
- "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
-)
-
-const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size
-
-// SyncAndPublish is an interface for streaming, converting to IPLDs, publishing, and indexing all Ethereum data
-// This is the top-level interface used by the syncAndPublish command
-type SyncAndPublish interface {
- Process(wg *sync.WaitGroup) error
-}
-
-// Processor is the underlying struct for the SyncAndPublish interface
-type Processor struct {
- // Interface for streaming statediff payloads over a geth rpc subscription
- Streamer StateDiffStreamer
- // Interface for converting statediff payloads into ETH-IPLD object payloads
- Converter PayloadConverter
- // Interface for publishing the ETH-IPLD payloads to IPFS
- Publisher IPLDPublisher
- // Interface for indexing the CIDs of the published ETH-IPLDs in Postgres
- Repository CIDRepository
- // Chan the processor uses to subscribe to state diff payloads from the Streamer
- PayloadChan chan statediff.Payload
- // Chan used to shut down the Processor
- QuitChan chan bool
-}
-
-// NewIPFSProcessor creates a new Processor interface using an underlying Processor struct
-func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncAndPublish, error) {
- publisher, err := NewIPLDPublisher(ipfsPath)
- if err != nil {
- return nil, err
- }
- return &Processor{
- Streamer: NewStateDiffStreamer(rpcClient),
- Repository: NewCIDRepository(db),
- Converter: NewPayloadConverter(ethClient),
- Publisher: publisher,
- PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
- QuitChan: qc,
- }, nil
-}
-
-// Process is the main processing loop
-func (i *Processor) Process(wg *sync.WaitGroup) error {
- sub, err := i.Streamer.Stream(i.PayloadChan)
- if err != nil {
- return err
- }
- wg.Add(1)
- go func() {
- for {
- select {
- case payload := <-i.PayloadChan:
- if payload.Err != nil {
- log.Error(err)
- continue
- }
- ipldPayload, err := i.Converter.Convert(payload)
- if err != nil {
- log.Error(err)
- continue
- }
- cidPayload, err := i.Publisher.Publish(ipldPayload)
- if err != nil {
- log.Error(err)
- continue
- }
- err = i.Repository.Index(cidPayload)
- if err != nil {
- log.Error(err)
- }
- case err = <-sub.Err():
- log.Error(err)
- case <-i.QuitChan:
- log.Info("quiting IPFSProcessor")
- wg.Done()
- return
- }
- }
- }()
-
- return nil
-}
diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go
new file mode 100644
index 00000000..9e0b2eed
--- /dev/null
+++ b/pkg/ipfs/service.go
@@ -0,0 +1,240 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package ipfs
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/statediff"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vulcanize/vulcanizedb/pkg/core"
+ "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
+)
+
+const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size
+
+// SyncAndPublish is an interface for streaming, converting to IPLDs, publishing, and indexing all Ethereum data
+// This is the top-level interface used by the syncAndPublish command
+type SyncPublishAndServe interface {
+ // APIs(), Protocols(), Start() and Stop()
+ node.Service
+ // Main event loop for syncAndPublish processes
+ SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error
+ // Main event loop for handling client pub-sub
+ Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool)
+ // Method to subscribe to receive state diff processing output
+ Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, params *Params)
+ // Method to unsubscribe from state diff processing
+ Unsubscribe(id rpc.ID) error
+}
+
+// Processor is the underlying struct for the SyncAndPublish interface
+type Service struct {
+ // Used to sync access to the Subscriptions
+ sync.Mutex
+ // Interface for streaming statediff payloads over a geth rpc subscription
+ Streamer StateDiffStreamer
+ // Interface for converting statediff payloads into ETH-IPLD object payloads
+ Converter PayloadConverter
+ // Interface for publishing the ETH-IPLD payloads to IPFS
+ Publisher IPLDPublisher
+ // Interface for indexing the CIDs of the published ETH-IPLDs in Postgres
+ Repository CIDRepository
+ // Chan the processor uses to subscribe to state diff payloads from the Streamer
+ PayloadChan chan statediff.Payload
+ // Used to signal shutdown of the service
+ QuitChan chan bool
+ // A mapping of rpc.IDs to their subscription channels
+ Subscriptions map[rpc.ID]Subscription
+}
+
+// NewIPFSProcessor creates a new Processor interface using an underlying Processor struct
+func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncPublishAndServe, error) {
+ publisher, err := NewIPLDPublisher(ipfsPath)
+ if err != nil {
+ return nil, err
+ }
+ return &Service{
+ Streamer: NewStateDiffStreamer(rpcClient),
+ Repository: NewCIDRepository(db),
+ Converter: NewPayloadConverter(ethClient),
+ Publisher: publisher,
+ PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
+ QuitChan: qc,
+ }, nil
+}
+
+// Protocols exports the services p2p protocols, this service has none
+func (sap *Service) Protocols() []p2p.Protocol {
+ return []p2p.Protocol{}
+}
+
+// APIs returns the RPC descriptors the StateDiffingService offers
+func (sap *Service) APIs() []rpc.API {
+ return []rpc.API{
+ {
+ Namespace: APIName,
+ Version: APIVersion,
+ Service: NewPublicSeedNodeAPI(sap),
+ Public: true,
+ },
+ }
+}
+
+// SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids
+// It then forwards the data to the Serve() loop which filters and sends relevent data to client subscriptions
+func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error {
+ sub, err := sap.Streamer.Stream(sap.PayloadChan)
+ if err != nil {
+ return err
+ }
+ wg.Add(1)
+ go func() {
+ for {
+ select {
+ case payload := <-sap.PayloadChan:
+ if payload.Err != nil {
+ log.Error(err)
+ continue
+ }
+ ipldPayload, err := sap.Converter.Convert(payload)
+ if err != nil {
+ log.Error(err)
+ continue
+ }
+ select {
+ case forwardPayloadChan <- *ipldPayload:
+ default:
+ }
+ cidPayload, err := sap.Publisher.Publish(ipldPayload)
+ if err != nil {
+ log.Error(err)
+ continue
+ }
+ err = sap.Repository.Index(cidPayload)
+ if err != nil {
+ log.Error(err)
+ }
+ case err = <-sub.Err():
+ log.Error(err)
+ case <-sap.QuitChan:
+ select {
+ case forwardQuitchan <- true:
+ default:
+ }
+ log.Info("quiting SyncAndPublish process")
+ wg.Done()
+ return
+ }
+ }
+ }()
+
+ return nil
+}
+
+func (sap *Service) Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) {
+ wg.Add(1)
+ go func() {
+ for {
+ select {
+ case payload := <-receivePayloadChan:
+ println(payload.BlockNumber.Int64())
+ // Method for using subscription parameters to filter payload and stream relevent info to sub channel
+ case <-receiveQuitchan:
+ log.Info("quiting Serve process")
+ wg.Done()
+ return
+ }
+ }
+ }()
+}
+
+// Subscribe is used by the API to subscribe to the StateDiffingService loop
+func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, params *Params) {
+ log.Info("Subscribing to the statediff service")
+ sap.Lock()
+ sap.Subscriptions[id] = Subscription{
+ PayloadChan: sub,
+ QuitChan: quitChan,
+ }
+ sap.Unlock()
+}
+
+// Unsubscribe is used to unsubscribe to the StateDiffingService loop
+func (sap *Service) Unsubscribe(id rpc.ID) error {
+ log.Info("Unsubscribing from the statediff service")
+ sap.Lock()
+ _, ok := sap.Subscriptions[id]
+ if !ok {
+ return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id)
+ }
+ delete(sap.Subscriptions, id)
+ sap.Unlock()
+ return nil
+}
+
+// Start is used to begin the StateDiffingService
+func (sap *Service) Start(*p2p.Server) error {
+ log.Info("Starting statediff service")
+ wg := new(sync.WaitGroup)
+ payloadChan := make(chan IPLDPayload)
+ quitChan := make(chan bool)
+ go sap.SyncAndPublish(wg, payloadChan, quitChan)
+ go sap.Serve(wg, payloadChan, quitChan)
+ return nil
+}
+
+// Stop is used to close down the StateDiffingService
+func (sap *Service) Stop() error {
+ log.Info("Stopping statediff service")
+ close(sap.QuitChan)
+ return nil
+}
+
+// send is used to fan out and serve a payload to any subscriptions
+func (sap *Service) send(payload ResponsePayload) {
+ sap.Lock()
+ for id, sub := range sap.Subscriptions {
+ select {
+ case sub.PayloadChan <- payload:
+ log.Infof("sending state diff payload to subscription %s", id)
+ default:
+ log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
+ }
+ }
+ sap.Unlock()
+}
+
+// close is used to close all listening subscriptions
+func (sap *Service) close() {
+ sap.Lock()
+ for id, sub := range sap.Subscriptions {
+ select {
+ case sub.QuitChan <- true:
+ delete(sap.Subscriptions, id)
+ log.Infof("closing subscription %s", id)
+ default:
+ log.Infof("unable to close subscription %s; channel has no receiver", id)
+ }
+ }
+ sap.Unlock()
+}
diff --git a/pkg/ipfs/processor_test.go b/pkg/ipfs/service_test.go
similarity index 94%
rename from pkg/ipfs/processor_test.go
rename to pkg/ipfs/service_test.go
index a5664078..40dbce90 100644
--- a/pkg/ipfs/processor_test.go
+++ b/pkg/ipfs/service_test.go
@@ -30,9 +30,9 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks"
)
-var _ = Describe("Processor", func() {
+var _ = Describe("Service", func() {
- Describe("Process", func() {
+ Describe("Loop", func() {
It("Streams StatediffPayloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() {
wg := new(sync.WaitGroup)
payloadChan := make(chan statediff.Payload, 1)
@@ -55,7 +55,7 @@ var _ = Describe("Processor", func() {
ReturnIPLDPayload: &test_helpers.MockIPLDPayload,
ReturnErr: nil,
}
- processor := &ipfs.Processor{
+ processor := &ipfs.Service{
Repository: mockCidRepo,
Publisher: mockPublisher,
Streamer: mockStreamer,
@@ -63,7 +63,7 @@ var _ = Describe("Processor", func() {
PayloadChan: payloadChan,
QuitChan: quitChan,
}
- err := processor.Process(wg)
+ err := processor.SyncAndPublish(wg, nil, nil)
Expect(err).ToNot(HaveOccurred())
time.Sleep(2 * time.Second)
quitChan <- true
diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go
index ef53d8ec..228cf85c 100644
--- a/pkg/ipfs/types.go
+++ b/pkg/ipfs/types.go
@@ -17,12 +17,49 @@
package ipfs
import (
+ "encoding/json"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
+//
+type Subscription struct {
+ PayloadChan chan<- ResponsePayload
+ QuitChan chan<- bool
+}
+
+type ResponsePayload struct {
+ HeadersRlp [][]byte `json:"headersRlp"`
+ UnclesRlp [][]byte `json:"unclesRlp"`
+ TransactionsRlp [][]byte `json:"transactionsRlp"`
+ ReceiptsRlp [][]byte `json:"receiptsRlp"`
+ StateNodesRlp [][]byte `json:"stateNodesRlp"`
+ StorageNodesRlp [][]byte `json:"storageNodesRlp"`
+
+ encoded []byte
+ err error
+}
+
+func (sd *ResponsePayload) ensureEncoded() {
+ if sd.encoded == nil && sd.err == nil {
+ sd.encoded, sd.err = json.Marshal(sd)
+ }
+}
+
+// Length to implement Encoder interface for StateDiff
+func (sd *ResponsePayload) Length() int {
+ sd.ensureEncoded()
+ return len(sd.encoded)
+}
+
+// Encode to implement Encoder interface for StateDiff
+func (sd *ResponsePayload) Encode() ([]byte, error) {
+ sd.ensureEncoded()
+ return sd.encoded, sd.err
+}
+
// IPLDPayload is a custom type which packages ETH data for the IPFS publisher
type IPLDPayload struct {
HeaderRLP []byte
@@ -82,3 +119,41 @@ type TrxMetaData struct {
To string
From string
}
+
+// Params are set by the client to tell the server how to filter that is fed into their subscription
+type Params struct {
+ HeaderFilter struct {
+ Off bool
+ StartingBlock int64
+ EndingBlock int64 // set to 0 or a negative value to have no ending block
+ Uncles bool
+ }
+ TrxFilter struct {
+ Off bool
+ StartingBlock int64
+ EndingBlock int64
+ Src string
+ Dst string
+ }
+ ReceiptFilter struct {
+ Off bool
+ StartingBlock int64
+ EndingBlock int64
+ Topic0s []string
+ }
+ StateFilter struct {
+ Off bool
+ StartingBlock int64
+ EndingBlock int64
+ Address string // is converted to state key by taking its keccak256 hash
+ LeafsOnly bool
+ }
+ StorageFilter struct {
+ Off bool
+ StartingBlock int64
+ EndingBlock int64
+ Address string
+ StorageKey string
+ LeafsOnly bool
+ }
+}
diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go
deleted file mode 100644
index 2ebfa460..00000000
--- a/pkg/rpc/client.go
+++ /dev/null
@@ -1,31 +0,0 @@
-// VulcanizeDB
-// Copyright © 2019 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package rpc
-
-type Client interface {
-}
-
-type RpcClient struct {
-}
-
-func NewRpcClient() *RpcClient {
- return &RpcClient{}
-}
-
-func (rpcc *RpcClient) Dial() error {
- return nil
-}
diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go
deleted file mode 100644
index 182d92e1..00000000
--- a/pkg/rpc/server.go
+++ /dev/null
@@ -1,31 +0,0 @@
-// VulcanizeDB
-// Copyright © 2019 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package rpc
-
-type Server interface {
-}
-
-type RpcServer struct {
-}
-
-func NewRpcServer() *RpcServer {
- return &RpcServer{}
-}
-
-func (rpcs *RpcServer) Serve() error {
- return nil
-}
diff --git a/pkg/rpc/subscription.go b/pkg/rpc/subscription.go
deleted file mode 100644
index 40b4e1d0..00000000
--- a/pkg/rpc/subscription.go
+++ /dev/null
@@ -1,59 +0,0 @@
-// VulcanizeDB
-// Copyright © 2019 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package rpc
-
-// Subscription
-type Subscription struct {
-}
-
-// Params are set by the client to tell the server how to filter that is fed into their subscription
-type Params struct {
- HeaderFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64 // set to 0 or a negative value to have no ending block
- Uncles bool
- }
- TrxFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64
- Src string
- Dst string
- }
- ReceiptFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64
- Topic0s []string
- }
- StateFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64
- Address string // is converted to state key by taking its keccak256 hash
- LeafsOnly bool
- }
- StorageFilter struct {
- Off bool
- StartingBlock int64
- EndingBlock int64
- Address string
- StorageKey string
- LeafsOnly bool
- }
-}