forked from cerc-io/ipld-eth-server
Merge pull request #189 from vulcanize/additional_logging
Additional logging
This commit is contained in:
commit
96162e1853
@ -38,19 +38,22 @@ var resyncCmd = &cobra.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func rsyncCmdCommand() {
|
func rsyncCmdCommand() {
|
||||||
|
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta)
|
||||||
|
logWithCommand.Debug("loading super node configuration variables")
|
||||||
rConfig, err := resync.NewReSyncConfig()
|
rConfig, err := resync.NewReSyncConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
logWithCommand.Infof("vdb version: %s", v.VersionWithMeta)
|
|
||||||
logWithCommand.Infof("resync config: %+v", rConfig)
|
logWithCommand.Infof("resync config: %+v", rConfig)
|
||||||
if err := ipfs.InitIPFSPlugins(); err != nil {
|
if err := ipfs.InitIPFSPlugins(); err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
|
logWithCommand.Debug("initializing new resync service")
|
||||||
rService, err := resync.NewResyncService(rConfig)
|
rService, err := resync.NewResyncService(rConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
|
logWithCommand.Info("starting up resync process")
|
||||||
if err := rService.Resync(); err != nil {
|
if err := rService.Resync(); err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -53,52 +53,61 @@ and fill in gaps in the data
|
|||||||
}
|
}
|
||||||
|
|
||||||
func superNode() {
|
func superNode() {
|
||||||
|
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta)
|
||||||
|
logWithCommand.Debug("loading super node configuration variables")
|
||||||
superNodeConfig, err := super_node.NewSuperNodeConfig()
|
superNodeConfig, err := super_node.NewSuperNodeConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
logWithCommand.Infof("vdb version: %s", v.VersionWithMeta)
|
|
||||||
logWithCommand.Infof("super node config: %+v", superNodeConfig)
|
logWithCommand.Infof("super node config: %+v", superNodeConfig)
|
||||||
if err := ipfs.InitIPFSPlugins(); err != nil {
|
if err := ipfs.InitIPFSPlugins(); err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
logWithCommand.Debug("initializing new super node service")
|
||||||
superNode, err := super_node.NewSuperNode(superNodeConfig)
|
superNode, err := super_node.NewSuperNode(superNodeConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
var forwardPayloadChan chan shared.ConvertedData
|
var forwardPayloadChan chan shared.ConvertedData
|
||||||
if superNodeConfig.Serve {
|
if superNodeConfig.Serve {
|
||||||
|
logWithCommand.Info("starting up super node servers")
|
||||||
forwardPayloadChan = make(chan shared.ConvertedData, super_node.PayloadChanBufferSize)
|
forwardPayloadChan = make(chan shared.ConvertedData, super_node.PayloadChanBufferSize)
|
||||||
superNode.FilterAndServe(wg, forwardPayloadChan)
|
superNode.Serve(wg, forwardPayloadChan)
|
||||||
if err := startServers(superNode, superNodeConfig); err != nil {
|
if err := startServers(superNode, superNodeConfig); err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if superNodeConfig.Sync {
|
if superNodeConfig.Sync {
|
||||||
if err := superNode.ProcessData(wg, forwardPayloadChan); err != nil {
|
logWithCommand.Info("starting up super node sync process")
|
||||||
|
if err := superNode.Sync(wg, forwardPayloadChan); err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if superNodeConfig.BackFill {
|
if superNodeConfig.BackFill {
|
||||||
|
logWithCommand.Debug("initializing new super node backfill service")
|
||||||
backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan)
|
backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
backFiller.FillGapsInSuperNode(wg)
|
logWithCommand.Info("starting up super node backfill process")
|
||||||
|
backFiller.BackFill(wg)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func startServers(superNode super_node.SuperNode, settings *super_node.Config) error {
|
func startServers(superNode super_node.SuperNode, settings *super_node.Config) error {
|
||||||
|
logWithCommand.Debug("starting up IPC server")
|
||||||
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs())
|
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logWithCommand.Debug("starting up WS server")
|
||||||
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, superNode.APIs(), []string{"vdb"}, nil, true)
|
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, superNode.APIs(), []string{"vdb"}, nil, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logWithCommand.Debug("starting up HTTP server")
|
||||||
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{settings.Chain.API()}, nil, nil, rpc.HTTPTimeouts{})
|
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{settings.Chain.API()}, nil, nil, rpc.HTTPTimeouts{})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -106,7 +115,7 @@ func startServers(superNode super_node.SuperNode, settings *super_node.Config) e
|
|||||||
func init() {
|
func init() {
|
||||||
rootCmd.AddCommand(superNodeCmd)
|
rootCmd.AddCommand(superNodeCmd)
|
||||||
|
|
||||||
// flags
|
// flags for all config variables
|
||||||
superNodeCmd.PersistentFlags().String("ipfs-path", "", "ipfs repository path")
|
superNodeCmd.PersistentFlags().String("ipfs-path", "", "ipfs repository path")
|
||||||
|
|
||||||
superNodeCmd.PersistentFlags().String("supernode-chain", "", "which chain to support, options are currently Ethereum or Bitcoin.")
|
superNodeCmd.PersistentFlags().String("supernode-chain", "", "which chain to support, options are currently Ethereum or Bitcoin.")
|
||||||
|
@ -19,6 +19,8 @@ package ipfs
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/ipfs/go-blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
"github.com/ipfs/go-ipfs/core"
|
"github.com/ipfs/go-ipfs/core"
|
||||||
"github.com/ipfs/go-ipfs/plugin/loader"
|
"github.com/ipfs/go-ipfs/plugin/loader"
|
||||||
@ -29,6 +31,7 @@ import (
|
|||||||
// InitIPFSPlugins is used to initialized IPFS plugins before creating a new IPFS node
|
// InitIPFSPlugins is used to initialized IPFS plugins before creating a new IPFS node
|
||||||
// This should only be called once
|
// This should only be called once
|
||||||
func InitIPFSPlugins() error {
|
func InitIPFSPlugins() error {
|
||||||
|
logrus.Debug("initializing IPFS plugins")
|
||||||
l, err := loader.NewPluginLoader("")
|
l, err := loader.NewPluginLoader("")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -42,6 +45,7 @@ func InitIPFSPlugins() error {
|
|||||||
|
|
||||||
// InitIPFSBlockService is used to configure and return a BlockService using an ipfs repo path (e.g. ~/.ipfs)
|
// InitIPFSBlockService is used to configure and return a BlockService using an ipfs repo path (e.g. ~/.ipfs)
|
||||||
func InitIPFSBlockService(ipfsPath string) (blockservice.BlockService, error) {
|
func InitIPFSBlockService(ipfsPath string) (blockservice.BlockService, error) {
|
||||||
|
logrus.Debug("initializing IPFS block service interface")
|
||||||
r, openErr := fsrepo.Open(ipfsPath)
|
r, openErr := fsrepo.Open(ipfsPath)
|
||||||
if openErr != nil {
|
if openErr != nil {
|
||||||
return nil, openErr
|
return nil, openErr
|
||||||
@ -68,6 +72,7 @@ func (ipfs IPFS) Add(node ipld.Node) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func InitIPFSNode(repoPath string) (*IPFS, error) {
|
func InitIPFSNode(repoPath string) (*IPFS, error) {
|
||||||
|
logrus.Debug("initializing IPFS node interface")
|
||||||
r, err := fsrepo.Open(repoPath)
|
r, err := fsrepo.Open(repoPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -36,7 +36,7 @@ const (
|
|||||||
// BackFillInterface for filling in gaps in the super node
|
// BackFillInterface for filling in gaps in the super node
|
||||||
type BackFillInterface interface {
|
type BackFillInterface interface {
|
||||||
// Method for the super node to periodically check for and fill in gaps in its data using an archival node
|
// Method for the super node to periodically check for and fill in gaps in its data using an archival node
|
||||||
FillGapsInSuperNode(wg *sync.WaitGroup)
|
BackFill(wg *sync.WaitGroup)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BackFillService for filling in gaps in the super node
|
// BackFillService for filling in gaps in the super node
|
||||||
@ -113,8 +113,8 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FillGapsInSuperNode periodically checks for and fills in gaps in the super node db
|
// BackFill periodically checks for and fills in gaps in the super node db
|
||||||
func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
|
func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
|
||||||
ticker := time.NewTicker(bfs.GapCheckFrequency)
|
ticker := time.NewTicker(bfs.GapCheckFrequency)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
@ -151,7 +151,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
log.Infof("%s fillGaps goroutine successfully spun up", bfs.Chain.String())
|
log.Infof("%s BackFill goroutine successfully spun up", bfs.Chain.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
|
// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
|
||||||
|
@ -72,7 +72,7 @@ var _ = Describe("BackFiller", func() {
|
|||||||
Chain: shared.Ethereum,
|
Chain: shared.Ethereum,
|
||||||
}
|
}
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
backfiller.FillGapsInSuperNode(wg)
|
backfiller.BackFill(wg)
|
||||||
time.Sleep(time.Second * 3)
|
time.Sleep(time.Second * 3)
|
||||||
quitChan <- true
|
quitChan <- true
|
||||||
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
|
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
|
||||||
@ -128,7 +128,7 @@ var _ = Describe("BackFiller", func() {
|
|||||||
Chain: shared.Ethereum,
|
Chain: shared.Ethereum,
|
||||||
}
|
}
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
backfiller.FillGapsInSuperNode(wg)
|
backfiller.BackFill(wg)
|
||||||
time.Sleep(time.Second * 3)
|
time.Sleep(time.Second * 3)
|
||||||
quitChan <- true
|
quitChan <- true
|
||||||
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
|
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
|
||||||
@ -178,7 +178,7 @@ var _ = Describe("BackFiller", func() {
|
|||||||
Chain: shared.Ethereum,
|
Chain: shared.Ethereum,
|
||||||
}
|
}
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
backfiller.FillGapsInSuperNode(wg)
|
backfiller.BackFill(wg)
|
||||||
time.Sleep(time.Second * 3)
|
time.Sleep(time.Second * 3)
|
||||||
quitChan <- true
|
quitChan <- true
|
||||||
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
|
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
|
||||||
|
@ -42,7 +42,7 @@ func NewHTTPPayloadStreamer(clientConfig *rpcclient.ConnConfig) *HTTPPayloadStre
|
|||||||
// Stream is the main loop for subscribing to data from the btc block notifications
|
// Stream is the main loop for subscribing to data from the btc block notifications
|
||||||
// Satisfies the shared.PayloadStreamer interface
|
// Satisfies the shared.PayloadStreamer interface
|
||||||
func (ps *HTTPPayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
|
func (ps *HTTPPayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
|
||||||
logrus.Info("streaming block payloads from btc")
|
logrus.Debug("streaming block payloads from btc")
|
||||||
client, err := rpcclient.New(ps.Config, nil)
|
client, err := rpcclient.New(ps.Config, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -602,8 +602,14 @@ var _ = Describe("Retriever", func() {
|
|||||||
payload8.HeaderCID.BlockNumber = "105"
|
payload8.HeaderCID.BlockNumber = "105"
|
||||||
payload9 := payload4
|
payload9 := payload4
|
||||||
payload9.HeaderCID.BlockNumber = "106"
|
payload9.HeaderCID.BlockNumber = "106"
|
||||||
payload10 := payload5
|
payload10 := payload4
|
||||||
payload10.HeaderCID.BlockNumber = "1000"
|
payload10.HeaderCID.BlockNumber = "107"
|
||||||
|
payload11 := payload4
|
||||||
|
payload11.HeaderCID.BlockNumber = "108"
|
||||||
|
payload12 := payload4
|
||||||
|
payload12.HeaderCID.BlockNumber = "109"
|
||||||
|
payload13 := payload5
|
||||||
|
payload13.HeaderCID.BlockNumber = "1000"
|
||||||
err := repo.Index(&payload1)
|
err := repo.Index(&payload1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = repo.Index(&payload2)
|
err = repo.Index(&payload2)
|
||||||
@ -624,18 +630,25 @@ var _ = Describe("Retriever", func() {
|
|||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = repo.Index(&payload10)
|
err = repo.Index(&payload10)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = repo.Index(&payload11)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = repo.Index(&payload12)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = repo.Index(&payload13)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
cleaner := eth.NewCleaner(db)
|
cleaner := eth.NewCleaner(db)
|
||||||
err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}})
|
err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}, {106, 108}})
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
gaps, err := retriever.RetrieveGapsInData(1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(len(gaps)).To(Equal(5))
|
Expect(len(gaps)).To(Equal(6))
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 106, Stop: 108})).To(BeTrue())
|
||||||
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 110, Stop: 999})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -51,7 +51,7 @@ func NewPayloadStreamer(client StreamClient) *PayloadStreamer {
|
|||||||
// Satisfies the shared.PayloadStreamer interface
|
// Satisfies the shared.PayloadStreamer interface
|
||||||
func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
|
func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
|
||||||
stateDiffChan := make(chan statediff.Payload, PayloadChanBufferSize)
|
stateDiffChan := make(chan statediff.Payload, PayloadChanBufferSize)
|
||||||
logrus.Info("streaming diffs from geth")
|
logrus.Debug("streaming diffs from geth")
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -38,15 +38,15 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// SuperNode is the top level interface for streaming, converting to IPLDs, publishing,
|
// SuperNode is the top level interface for streaming, converting to IPLDs, publishing,
|
||||||
// and indexing all Ethereum data; screening this data; and serving it up to subscribed clients
|
// and indexing all chain data; screening this data; and serving it up to subscribed clients
|
||||||
// This service is compatible with the Ethereum service interface (node.Service)
|
// This service is compatible with the Ethereum service interface (node.Service)
|
||||||
type SuperNode interface {
|
type SuperNode interface {
|
||||||
// APIs(), Protocols(), Start() and Stop()
|
// APIs(), Protocols(), Start() and Stop()
|
||||||
node.Service
|
node.Service
|
||||||
// Data processing event loop
|
// Data processing event loop
|
||||||
ProcessData(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error
|
Sync(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error
|
||||||
// Pub-Sub handling event loop
|
// Pub-Sub handling event loop
|
||||||
FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData)
|
Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData)
|
||||||
// Method to subscribe to the service
|
// Method to subscribe to the service
|
||||||
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings)
|
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings)
|
||||||
// Method to unsubscribe from the service
|
// Method to unsubscribe from the service
|
||||||
@ -186,11 +186,11 @@ func (sap *Service) APIs() []rpc.API {
|
|||||||
return append(apis, chainAPI)
|
return append(apis, chainAPI)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessData streams incoming raw chain data and converts it for further processing
|
// Sync streams incoming raw chain data and converts it for further processing
|
||||||
// It forwards the converted data to the publishAndIndex process(es) it spins up
|
// It forwards the converted data to the publishAndIndex process(es) it spins up
|
||||||
// If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel
|
// If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel
|
||||||
// This continues on no matter if or how many subscribers there are
|
// This continues on no matter if or how many subscribers there are
|
||||||
func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error {
|
func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error {
|
||||||
sub, err := sap.Streamer.Stream(sap.PayloadChan)
|
sub, err := sap.Streamer.Stream(sap.PayloadChan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -213,7 +213,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<-
|
|||||||
log.Errorf("super node conversion error for chain %s: %v", sap.chain.String(), err)
|
log.Errorf("super node conversion error for chain %s: %v", sap.chain.String(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Infof("processing %s data streamed at head height %d", sap.chain.String(), ipldPayload.Height())
|
log.Infof("%s data streamed at head height %d", sap.chain.String(), ipldPayload.Height())
|
||||||
// If we have a ScreenAndServe process running, forward the iplds to it
|
// If we have a ScreenAndServe process running, forward the iplds to it
|
||||||
select {
|
select {
|
||||||
case screenAndServePayload <- ipldPayload:
|
case screenAndServePayload <- ipldPayload:
|
||||||
@ -230,7 +230,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<-
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
log.Infof("%s ProcessData goroutine successfully spun up", sap.chain.String())
|
log.Infof("%s Sync goroutine successfully spun up", sap.chain.String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -241,11 +241,13 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case payload := <-publishAndIndexPayload:
|
case payload := <-publishAndIndexPayload:
|
||||||
|
log.Debugf("publishing %s data streamed at head height %d", sap.chain.String(), payload.Height())
|
||||||
cidPayload, err := sap.Publisher.Publish(payload)
|
cidPayload, err := sap.Publisher.Publish(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
|
log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.Debugf("indexing %s data streamed at head height %d", sap.chain.String(), payload.Height())
|
||||||
if err := sap.Indexer.Index(cidPayload); err != nil {
|
if err := sap.Indexer.Index(cidPayload); err != nil {
|
||||||
log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
|
log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
|
||||||
}
|
}
|
||||||
@ -255,11 +257,11 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared
|
|||||||
log.Debugf("%s publishAndIndex goroutine successfully spun up", sap.chain.String())
|
log.Debugf("%s publishAndIndex goroutine successfully spun up", sap.chain.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterAndServe listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process
|
// Serve listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process
|
||||||
// It filters and sends this data to any subscribers to the service
|
// It filters and sends this data to any subscribers to the service
|
||||||
// This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process
|
// This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process
|
||||||
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
|
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
|
||||||
func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) {
|
func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
@ -273,7 +275,7 @@ func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-c
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
log.Infof("%s FilterAndServe goroutine successfully spun up", sap.chain.String())
|
log.Infof("%s Serve goroutine successfully spun up", sap.chain.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions
|
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions
|
||||||
@ -444,10 +446,10 @@ func (sap *Service) Start(*p2p.Server) error {
|
|||||||
log.Infof("Starting %s super node service", sap.chain.String())
|
log.Infof("Starting %s super node service", sap.chain.String())
|
||||||
wg := new(sync.WaitGroup)
|
wg := new(sync.WaitGroup)
|
||||||
payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize)
|
payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize)
|
||||||
if err := sap.ProcessData(wg, payloadChan); err != nil {
|
if err := sap.Sync(wg, payloadChan); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
sap.FilterAndServe(wg, payloadChan)
|
sap.Serve(wg, payloadChan)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Service", func() {
|
var _ = Describe("Service", func() {
|
||||||
Describe("SyncAndPublish", func() {
|
Describe("Sync", func() {
|
||||||
It("Streams statediff.Payloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() {
|
It("Streams statediff.Payloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() {
|
||||||
wg := new(sync.WaitGroup)
|
wg := new(sync.WaitGroup)
|
||||||
payloadChan := make(chan shared.RawChainData, 1)
|
payloadChan := make(chan shared.RawChainData, 1)
|
||||||
@ -63,7 +63,7 @@ var _ = Describe("Service", func() {
|
|||||||
QuitChan: quitChan,
|
QuitChan: quitChan,
|
||||||
WorkerPoolSize: 1,
|
WorkerPoolSize: 1,
|
||||||
}
|
}
|
||||||
err := processor.ProcessData(wg, nil)
|
err := processor.Sync(wg, nil)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
quitChan <- true
|
quitChan <- true
|
||||||
|
@ -21,7 +21,7 @@ import "fmt"
|
|||||||
const (
|
const (
|
||||||
VersionMajor = 0 // Major version component of the current release
|
VersionMajor = 0 // Major version component of the current release
|
||||||
VersionMinor = 1 // Minor version component of the current release
|
VersionMinor = 1 // Minor version component of the current release
|
||||||
VersionPatch = 0 // Patch version component of the current release
|
VersionPatch = 1 // Patch version component of the current release
|
||||||
VersionMeta = "alpha" // Version metadata to append to the version string
|
VersionMeta = "alpha" // Version metadata to append to the version string
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user