pub sub service and api

This commit is contained in:
Ian Norden 2019-05-17 01:27:02 -05:00
parent 5ebe2243d8
commit b83c0371d9
8 changed files with 401 additions and 234 deletions

82
pkg/ipfs/api.go Normal file
View File

@ -0,0 +1,82 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
import (
"context"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
// APIName is the namespace used for the state diffing service API
const APIName = "vulcanizedb"
// APIVersion is the version of the state diffing service API
const APIVersion = "0.0.1"
// PublicSeedNodeAPI
type PublicSeedNodeAPI struct {
snp SyncPublishAndServe
}
// NewPublicSeedNodeAPI
func NewPublicSeedNodeAPI(snp SyncPublishAndServe) *PublicSeedNodeAPI {
return &PublicSeedNodeAPI{
snp: snp,
}
}
// Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created
func (api *PublicSeedNodeAPI) Subscribe(ctx context.Context, params *Params) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
// create subscription and start waiting for statediff events
rpcSub := notifier.CreateSubscription()
go func() {
// subscribe to events from the state diff service
payloadChannel := make(chan ResponsePayload)
quitChan := make(chan bool)
api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, params)
// loop and await state diff payloads and relay them to the subscriber with then notifier
for {
select {
case packet := <-payloadChannel:
if err := notifier.Notify(rpcSub.ID, packet); err != nil {
log.Error("Failed to send state diff packet", "err", err)
}
case <-rpcSub.Err():
err := api.snp.Unsubscribe(rpcSub.ID)
if err != nil {
log.Error("Failed to unsubscribe from the state diff service", err)
}
return
case <-quitChan:
// don't need to unsubscribe, statediff service does so before sending the quit signal
return
}
}
}()
return rpcSub, nil
}

View File

@ -1,109 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
import (
"sync"
"github.com/ethereum/go-ethereum/statediff"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size
// SyncAndPublish is an interface for streaming, converting to IPLDs, publishing, and indexing all Ethereum data
// This is the top-level interface used by the syncAndPublish command
type SyncAndPublish interface {
Process(wg *sync.WaitGroup) error
}
// Processor is the underlying struct for the SyncAndPublish interface
type Processor struct {
// Interface for streaming statediff payloads over a geth rpc subscription
Streamer StateDiffStreamer
// Interface for converting statediff payloads into ETH-IPLD object payloads
Converter PayloadConverter
// Interface for publishing the ETH-IPLD payloads to IPFS
Publisher IPLDPublisher
// Interface for indexing the CIDs of the published ETH-IPLDs in Postgres
Repository CIDRepository
// Chan the processor uses to subscribe to state diff payloads from the Streamer
PayloadChan chan statediff.Payload
// Chan used to shut down the Processor
QuitChan chan bool
}
// NewIPFSProcessor creates a new Processor interface using an underlying Processor struct
func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncAndPublish, error) {
publisher, err := NewIPLDPublisher(ipfsPath)
if err != nil {
return nil, err
}
return &Processor{
Streamer: NewStateDiffStreamer(rpcClient),
Repository: NewCIDRepository(db),
Converter: NewPayloadConverter(ethClient),
Publisher: publisher,
PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
QuitChan: qc,
}, nil
}
// Process is the main processing loop
func (i *Processor) Process(wg *sync.WaitGroup) error {
sub, err := i.Streamer.Stream(i.PayloadChan)
if err != nil {
return err
}
wg.Add(1)
go func() {
for {
select {
case payload := <-i.PayloadChan:
if payload.Err != nil {
log.Error(err)
continue
}
ipldPayload, err := i.Converter.Convert(payload)
if err != nil {
log.Error(err)
continue
}
cidPayload, err := i.Publisher.Publish(ipldPayload)
if err != nil {
log.Error(err)
continue
}
err = i.Repository.Index(cidPayload)
if err != nil {
log.Error(err)
}
case err = <-sub.Err():
log.Error(err)
case <-i.QuitChan:
log.Info("quiting IPFSProcessor")
wg.Done()
return
}
}
}()
return nil
}

240
pkg/ipfs/service.go Normal file
View File

@ -0,0 +1,240 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipfs
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
const payloadChanBufferSize = 800 // 1/10th max eth sub buffer size
// SyncAndPublish is an interface for streaming, converting to IPLDs, publishing, and indexing all Ethereum data
// This is the top-level interface used by the syncAndPublish command
type SyncPublishAndServe interface {
// APIs(), Protocols(), Start() and Stop()
node.Service
// Main event loop for syncAndPublish processes
SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error
// Main event loop for handling client pub-sub
Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool)
// Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, params *Params)
// Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) error
}
// Processor is the underlying struct for the SyncAndPublish interface
type Service struct {
// Used to sync access to the Subscriptions
sync.Mutex
// Interface for streaming statediff payloads over a geth rpc subscription
Streamer StateDiffStreamer
// Interface for converting statediff payloads into ETH-IPLD object payloads
Converter PayloadConverter
// Interface for publishing the ETH-IPLD payloads to IPFS
Publisher IPLDPublisher
// Interface for indexing the CIDs of the published ETH-IPLDs in Postgres
Repository CIDRepository
// Chan the processor uses to subscribe to state diff payloads from the Streamer
PayloadChan chan statediff.Payload
// Used to signal shutdown of the service
QuitChan chan bool
// A mapping of rpc.IDs to their subscription channels
Subscriptions map[rpc.ID]Subscription
}
// NewIPFSProcessor creates a new Processor interface using an underlying Processor struct
func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient, rpcClient core.RpcClient, qc chan bool) (SyncPublishAndServe, error) {
publisher, err := NewIPLDPublisher(ipfsPath)
if err != nil {
return nil, err
}
return &Service{
Streamer: NewStateDiffStreamer(rpcClient),
Repository: NewCIDRepository(db),
Converter: NewPayloadConverter(ethClient),
Publisher: publisher,
PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
QuitChan: qc,
}, nil
}
// Protocols exports the services p2p protocols, this service has none
func (sap *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
// APIs returns the RPC descriptors the StateDiffingService offers
func (sap *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: APIName,
Version: APIVersion,
Service: NewPublicSeedNodeAPI(sap),
Public: true,
},
}
}
// SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids
// It then forwards the data to the Serve() loop which filters and sends relevent data to client subscriptions
func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- IPLDPayload, forwardQuitchan chan<- bool) error {
sub, err := sap.Streamer.Stream(sap.PayloadChan)
if err != nil {
return err
}
wg.Add(1)
go func() {
for {
select {
case payload := <-sap.PayloadChan:
if payload.Err != nil {
log.Error(err)
continue
}
ipldPayload, err := sap.Converter.Convert(payload)
if err != nil {
log.Error(err)
continue
}
select {
case forwardPayloadChan <- *ipldPayload:
default:
}
cidPayload, err := sap.Publisher.Publish(ipldPayload)
if err != nil {
log.Error(err)
continue
}
err = sap.Repository.Index(cidPayload)
if err != nil {
log.Error(err)
}
case err = <-sub.Err():
log.Error(err)
case <-sap.QuitChan:
select {
case forwardQuitchan <- true:
default:
}
log.Info("quiting SyncAndPublish process")
wg.Done()
return
}
}
}()
return nil
}
func (sap *Service) Serve(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) {
wg.Add(1)
go func() {
for {
select {
case payload := <-receivePayloadChan:
println(payload.BlockNumber.Int64())
// Method for using subscription parameters to filter payload and stream relevent info to sub channel
case <-receiveQuitchan:
log.Info("quiting Serve process")
wg.Done()
return
}
}
}()
}
// Subscribe is used by the API to subscribe to the StateDiffingService loop
func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, params *Params) {
log.Info("Subscribing to the statediff service")
sap.Lock()
sap.Subscriptions[id] = Subscription{
PayloadChan: sub,
QuitChan: quitChan,
}
sap.Unlock()
}
// Unsubscribe is used to unsubscribe to the StateDiffingService loop
func (sap *Service) Unsubscribe(id rpc.ID) error {
log.Info("Unsubscribing from the statediff service")
sap.Lock()
_, ok := sap.Subscriptions[id]
if !ok {
return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id)
}
delete(sap.Subscriptions, id)
sap.Unlock()
return nil
}
// Start is used to begin the StateDiffingService
func (sap *Service) Start(*p2p.Server) error {
log.Info("Starting statediff service")
wg := new(sync.WaitGroup)
payloadChan := make(chan IPLDPayload)
quitChan := make(chan bool)
go sap.SyncAndPublish(wg, payloadChan, quitChan)
go sap.Serve(wg, payloadChan, quitChan)
return nil
}
// Stop is used to close down the StateDiffingService
func (sap *Service) Stop() error {
log.Info("Stopping statediff service")
close(sap.QuitChan)
return nil
}
// send is used to fan out and serve a payload to any subscriptions
func (sap *Service) send(payload ResponsePayload) {
sap.Lock()
for id, sub := range sap.Subscriptions {
select {
case sub.PayloadChan <- payload:
log.Infof("sending state diff payload to subscription %s", id)
default:
log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
}
}
sap.Unlock()
}
// close is used to close all listening subscriptions
func (sap *Service) close() {
sap.Lock()
for id, sub := range sap.Subscriptions {
select {
case sub.QuitChan <- true:
delete(sap.Subscriptions, id)
log.Infof("closing subscription %s", id)
default:
log.Infof("unable to close subscription %s; channel has no receiver", id)
}
}
sap.Unlock()
}

View File

@ -30,9 +30,9 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks" "github.com/vulcanize/vulcanizedb/pkg/ipfs/test_helpers/mocks"
) )
var _ = Describe("Processor", func() { var _ = Describe("Service", func() {
Describe("Process", func() { Describe("Loop", func() {
It("Streams StatediffPayloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() { It("Streams StatediffPayloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() {
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
payloadChan := make(chan statediff.Payload, 1) payloadChan := make(chan statediff.Payload, 1)
@ -55,7 +55,7 @@ var _ = Describe("Processor", func() {
ReturnIPLDPayload: &test_helpers.MockIPLDPayload, ReturnIPLDPayload: &test_helpers.MockIPLDPayload,
ReturnErr: nil, ReturnErr: nil,
} }
processor := &ipfs.Processor{ processor := &ipfs.Service{
Repository: mockCidRepo, Repository: mockCidRepo,
Publisher: mockPublisher, Publisher: mockPublisher,
Streamer: mockStreamer, Streamer: mockStreamer,
@ -63,7 +63,7 @@ var _ = Describe("Processor", func() {
PayloadChan: payloadChan, PayloadChan: payloadChan,
QuitChan: quitChan, QuitChan: quitChan,
} }
err := processor.Process(wg) err := processor.SyncAndPublish(wg, nil, nil)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
quitChan <- true quitChan <- true

View File

@ -17,12 +17,49 @@
package ipfs package ipfs
import ( import (
"encoding/json"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
//
type Subscription struct {
PayloadChan chan<- ResponsePayload
QuitChan chan<- bool
}
type ResponsePayload struct {
HeadersRlp [][]byte `json:"headersRlp"`
UnclesRlp [][]byte `json:"unclesRlp"`
TransactionsRlp [][]byte `json:"transactionsRlp"`
ReceiptsRlp [][]byte `json:"receiptsRlp"`
StateNodesRlp [][]byte `json:"stateNodesRlp"`
StorageNodesRlp [][]byte `json:"storageNodesRlp"`
encoded []byte
err error
}
func (sd *ResponsePayload) ensureEncoded() {
if sd.encoded == nil && sd.err == nil {
sd.encoded, sd.err = json.Marshal(sd)
}
}
// Length to implement Encoder interface for StateDiff
func (sd *ResponsePayload) Length() int {
sd.ensureEncoded()
return len(sd.encoded)
}
// Encode to implement Encoder interface for StateDiff
func (sd *ResponsePayload) Encode() ([]byte, error) {
sd.ensureEncoded()
return sd.encoded, sd.err
}
// IPLDPayload is a custom type which packages ETH data for the IPFS publisher // IPLDPayload is a custom type which packages ETH data for the IPFS publisher
type IPLDPayload struct { type IPLDPayload struct {
HeaderRLP []byte HeaderRLP []byte
@ -82,3 +119,41 @@ type TrxMetaData struct {
To string To string
From string From string
} }
// Params are set by the client to tell the server how to filter that is fed into their subscription
type Params struct {
HeaderFilter struct {
Off bool
StartingBlock int64
EndingBlock int64 // set to 0 or a negative value to have no ending block
Uncles bool
}
TrxFilter struct {
Off bool
StartingBlock int64
EndingBlock int64
Src string
Dst string
}
ReceiptFilter struct {
Off bool
StartingBlock int64
EndingBlock int64
Topic0s []string
}
StateFilter struct {
Off bool
StartingBlock int64
EndingBlock int64
Address string // is converted to state key by taking its keccak256 hash
LeafsOnly bool
}
StorageFilter struct {
Off bool
StartingBlock int64
EndingBlock int64
Address string
StorageKey string
LeafsOnly bool
}
}

View File

@ -1,31 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package rpc
type Client interface {
}
type RpcClient struct {
}
func NewRpcClient() *RpcClient {
return &RpcClient{}
}
func (rpcc *RpcClient) Dial() error {
return nil
}

View File

@ -1,31 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package rpc
type Server interface {
}
type RpcServer struct {
}
func NewRpcServer() *RpcServer {
return &RpcServer{}
}
func (rpcs *RpcServer) Serve() error {
return nil
}

View File

@ -1,59 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package rpc
// Subscription
type Subscription struct {
}
// Params are set by the client to tell the server how to filter that is fed into their subscription
type Params struct {
HeaderFilter struct {
Off bool
StartingBlock int64
EndingBlock int64 // set to 0 or a negative value to have no ending block
Uncles bool
}
TrxFilter struct {
Off bool
StartingBlock int64
EndingBlock int64
Src string
Dst string
}
ReceiptFilter struct {
Off bool
StartingBlock int64
EndingBlock int64
Topic0s []string
}
StateFilter struct {
Off bool
StartingBlock int64
EndingBlock int64
Address string // is converted to state key by taking its keccak256 hash
LeafsOnly bool
}
StorageFilter struct {
Off bool
StartingBlock int64
EndingBlock int64
Address string
StorageKey string
LeafsOnly bool
}
}