additional logging at initialization

This commit is contained in:
Ian Norden 2020-04-23 15:56:37 -05:00
parent 3bd1e518c6
commit ef77688145
11 changed files with 67 additions and 35 deletions

View File

@ -38,19 +38,22 @@ var resyncCmd = &cobra.Command{
}
func rsyncCmdCommand() {
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta)
logWithCommand.Debug("loading super node configuration variables")
rConfig, err := resync.NewReSyncConfig()
if err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Infof("vdb version: %s", v.VersionWithMeta)
logWithCommand.Infof("resync config: %+v", rConfig)
if err := ipfs.InitIPFSPlugins(); err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Debug("initializing new resync service")
rService, err := resync.NewResyncService(rConfig)
if err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Info("starting up resync process")
if err := rService.Resync(); err != nil {
logWithCommand.Fatal(err)
}

View File

@ -53,52 +53,61 @@ and fill in gaps in the data
}
func superNode() {
logWithCommand.Infof("running vdb version: %s", v.VersionWithMeta)
logWithCommand.Debug("loading super node configuration variables")
superNodeConfig, err := super_node.NewSuperNodeConfig()
if err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Infof("vdb version: %s", v.VersionWithMeta)
logWithCommand.Infof("super node config: %+v", superNodeConfig)
if err := ipfs.InitIPFSPlugins(); err != nil {
logWithCommand.Fatal(err)
}
wg := &sync.WaitGroup{}
logWithCommand.Debug("initializing new super node service")
superNode, err := super_node.NewSuperNode(superNodeConfig)
if err != nil {
logWithCommand.Fatal(err)
}
var forwardPayloadChan chan shared.ConvertedData
if superNodeConfig.Serve {
logWithCommand.Info("starting up super node servers")
forwardPayloadChan = make(chan shared.ConvertedData, super_node.PayloadChanBufferSize)
superNode.FilterAndServe(wg, forwardPayloadChan)
superNode.Serve(wg, forwardPayloadChan)
if err := startServers(superNode, superNodeConfig); err != nil {
logWithCommand.Fatal(err)
}
}
if superNodeConfig.Sync {
if err := superNode.ProcessData(wg, forwardPayloadChan); err != nil {
logWithCommand.Info("starting up super node sync process")
if err := superNode.Sync(wg, forwardPayloadChan); err != nil {
logWithCommand.Fatal(err)
}
}
if superNodeConfig.BackFill {
logWithCommand.Debug("initializing new super node backfill service")
backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan)
if err != nil {
logWithCommand.Fatal(err)
}
backFiller.FillGapsInSuperNode(wg)
logWithCommand.Info("starting up super node backfill process")
backFiller.BackFill(wg)
}
wg.Wait()
}
func startServers(superNode super_node.SuperNode, settings *super_node.Config) error {
logWithCommand.Debug("starting up IPC server")
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs())
if err != nil {
return err
}
logWithCommand.Debug("starting up WS server")
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, superNode.APIs(), []string{"vdb"}, nil, true)
if err != nil {
return err
}
logWithCommand.Debug("starting up HTTP server")
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, superNode.APIs(), []string{settings.Chain.API()}, nil, nil, rpc.HTTPTimeouts{})
return err
}
@ -106,7 +115,7 @@ func startServers(superNode super_node.SuperNode, settings *super_node.Config) e
func init() {
rootCmd.AddCommand(superNodeCmd)
// flags
// flags for all config variables
superNodeCmd.PersistentFlags().String("ipfs-path", "", "ipfs repository path")
superNodeCmd.PersistentFlags().String("supernode-chain", "", "which chain to support, options are currently Ethereum or Bitcoin.")

View File

@ -19,6 +19,8 @@ package ipfs
import (
"context"
"github.com/sirupsen/logrus"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/plugin/loader"
@ -29,6 +31,7 @@ import (
// InitIPFSPlugins is used to initialized IPFS plugins before creating a new IPFS node
// This should only be called once
func InitIPFSPlugins() error {
logrus.Debug("initializing IPFS plugins")
l, err := loader.NewPluginLoader("")
if err != nil {
return err
@ -42,6 +45,7 @@ func InitIPFSPlugins() error {
// InitIPFSBlockService is used to configure and return a BlockService using an ipfs repo path (e.g. ~/.ipfs)
func InitIPFSBlockService(ipfsPath string) (blockservice.BlockService, error) {
logrus.Debug("initializing IPFS block service interface")
r, openErr := fsrepo.Open(ipfsPath)
if openErr != nil {
return nil, openErr
@ -68,6 +72,7 @@ func (ipfs IPFS) Add(node ipld.Node) error {
}
func InitIPFSNode(repoPath string) (*IPFS, error) {
logrus.Debug("initializing IPFS node interface")
r, err := fsrepo.Open(repoPath)
if err != nil {
return nil, err

View File

@ -36,7 +36,7 @@ const (
// BackFillInterface for filling in gaps in the super node
type BackFillInterface interface {
// Method for the super node to periodically check for and fill in gaps in its data using an archival node
FillGapsInSuperNode(wg *sync.WaitGroup)
BackFill(wg *sync.WaitGroup)
}
// BackFillService for filling in gaps in the super node
@ -113,8 +113,8 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
}, nil
}
// FillGapsInSuperNode periodically checks for and fills in gaps in the super node db
func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
// BackFill periodically checks for and fills in gaps in the super node db
func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
ticker := time.NewTicker(bfs.GapCheckFrequency)
wg.Add(1)
@ -151,7 +151,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
}
}
}()
log.Infof("%s fillGaps goroutine successfully spun up", bfs.Chain.String())
log.Infof("%s BackFill goroutine successfully spun up", bfs.Chain.String())
}
// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks

View File

@ -72,7 +72,7 @@ var _ = Describe("BackFiller", func() {
Chain: shared.Ethereum,
}
wg := &sync.WaitGroup{}
backfiller.FillGapsInSuperNode(wg)
backfiller.BackFill(wg)
time.Sleep(time.Second * 3)
quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))
@ -128,7 +128,7 @@ var _ = Describe("BackFiller", func() {
Chain: shared.Ethereum,
}
wg := &sync.WaitGroup{}
backfiller.FillGapsInSuperNode(wg)
backfiller.BackFill(wg)
time.Sleep(time.Second * 3)
quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1))
@ -178,7 +178,7 @@ var _ = Describe("BackFiller", func() {
Chain: shared.Ethereum,
}
wg := &sync.WaitGroup{}
backfiller.FillGapsInSuperNode(wg)
backfiller.BackFill(wg)
time.Sleep(time.Second * 3)
quitChan <- true
Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2))

View File

@ -42,7 +42,7 @@ func NewHTTPPayloadStreamer(clientConfig *rpcclient.ConnConfig) *HTTPPayloadStre
// Stream is the main loop for subscribing to data from the btc block notifications
// Satisfies the shared.PayloadStreamer interface
func (ps *HTTPPayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
logrus.Info("streaming block payloads from btc")
logrus.Debug("streaming block payloads from btc")
client, err := rpcclient.New(ps.Config, nil)
if err != nil {
return nil, err

View File

@ -602,8 +602,14 @@ var _ = Describe("Retriever", func() {
payload8.HeaderCID.BlockNumber = "105"
payload9 := payload4
payload9.HeaderCID.BlockNumber = "106"
payload10 := payload5
payload10.HeaderCID.BlockNumber = "1000"
payload10 := payload4
payload10.HeaderCID.BlockNumber = "107"
payload11 := payload4
payload11.HeaderCID.BlockNumber = "108"
payload12 := payload4
payload12.HeaderCID.BlockNumber = "109"
payload13 := payload5
payload13.HeaderCID.BlockNumber = "1000"
err := repo.Index(&payload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2)
@ -624,18 +630,25 @@ var _ = Describe("Retriever", func() {
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload10)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload11)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload12)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload13)
Expect(err).ToNot(HaveOccurred())
cleaner := eth.NewCleaner(db)
err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}})
err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}, {106, 108}})
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(5))
Expect(len(gaps)).To(Equal(6))
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 106, Stop: 108})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 110, Stop: 999})).To(BeTrue())
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
})
})

View File

@ -51,7 +51,7 @@ func NewPayloadStreamer(client StreamClient) *PayloadStreamer {
// Satisfies the shared.PayloadStreamer interface
func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
stateDiffChan := make(chan statediff.Payload, PayloadChanBufferSize)
logrus.Info("streaming diffs from geth")
logrus.Debug("streaming diffs from geth")
go func() {
for {
select {

View File

@ -38,15 +38,15 @@ const (
)
// SuperNode is the top level interface for streaming, converting to IPLDs, publishing,
// and indexing all Ethereum data; screening this data; and serving it up to subscribed clients
// and indexing all chain data; screening this data; and serving it up to subscribed clients
// This service is compatible with the Ethereum service interface (node.Service)
type SuperNode interface {
// APIs(), Protocols(), Start() and Stop()
node.Service
// Data processing event loop
ProcessData(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error
Sync(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error
// Pub-Sub handling event loop
FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData)
Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData)
// Method to subscribe to the service
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings)
// Method to unsubscribe from the service
@ -186,11 +186,11 @@ func (sap *Service) APIs() []rpc.API {
return append(apis, chainAPI)
}
// ProcessData streams incoming raw chain data and converts it for further processing
// Sync streams incoming raw chain data and converts it for further processing
// It forwards the converted data to the publishAndIndex process(es) it spins up
// If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel
// This continues on no matter if or how many subscribers there are
func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error {
func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error {
sub, err := sap.Streamer.Stream(sap.PayloadChan)
if err != nil {
return err
@ -213,7 +213,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<-
log.Errorf("super node conversion error for chain %s: %v", sap.chain.String(), err)
continue
}
log.Infof("processing %s data streamed at head height %d", sap.chain.String(), ipldPayload.Height())
log.Infof("%s data streamed at head height %d", sap.chain.String(), ipldPayload.Height())
// If we have a ScreenAndServe process running, forward the iplds to it
select {
case screenAndServePayload <- ipldPayload:
@ -230,7 +230,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<-
}
}
}()
log.Infof("%s ProcessData goroutine successfully spun up", sap.chain.String())
log.Infof("%s Sync goroutine successfully spun up", sap.chain.String())
return nil
}
@ -241,11 +241,13 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared
for {
select {
case payload := <-publishAndIndexPayload:
log.Debugf("publishing %s data streamed at head height %d", sap.chain.String(), payload.Height())
cidPayload, err := sap.Publisher.Publish(payload)
if err != nil {
log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
continue
}
log.Debugf("indexing %s data streamed at head height %d", sap.chain.String(), payload.Height())
if err := sap.Indexer.Index(cidPayload); err != nil {
log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
}
@ -255,11 +257,11 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared
log.Debugf("%s publishAndIndex goroutine successfully spun up", sap.chain.String())
}
// FilterAndServe listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process
// Serve listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process
// It filters and sends this data to any subscribers to the service
// This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) {
func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) {
wg.Add(1)
go func() {
for {
@ -273,7 +275,7 @@ func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-c
}
}
}()
log.Infof("%s FilterAndServe goroutine successfully spun up", sap.chain.String())
log.Infof("%s Serve goroutine successfully spun up", sap.chain.String())
}
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions
@ -444,10 +446,10 @@ func (sap *Service) Start(*p2p.Server) error {
log.Infof("Starting %s super node service", sap.chain.String())
wg := new(sync.WaitGroup)
payloadChan := make(chan shared.ConvertedData, PayloadChanBufferSize)
if err := sap.ProcessData(wg, payloadChan); err != nil {
if err := sap.Sync(wg, payloadChan); err != nil {
return err
}
sap.FilterAndServe(wg, payloadChan)
sap.Serve(wg, payloadChan)
return nil
}

View File

@ -31,7 +31,7 @@ import (
)
var _ = Describe("Service", func() {
Describe("SyncAndPublish", func() {
Describe("Sync", func() {
It("Streams statediff.Payloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() {
wg := new(sync.WaitGroup)
payloadChan := make(chan shared.RawChainData, 1)
@ -63,7 +63,7 @@ var _ = Describe("Service", func() {
QuitChan: quitChan,
WorkerPoolSize: 1,
}
err := processor.ProcessData(wg, nil)
err := processor.Sync(wg, nil)
Expect(err).ToNot(HaveOccurred())
time.Sleep(2 * time.Second)
quitChan <- true

View File

@ -21,7 +21,7 @@ import "fmt"
const (
VersionMajor = 0 // Major version component of the current release
VersionMinor = 1 // Minor version component of the current release
VersionPatch = 0 // Patch version component of the current release
VersionPatch = 1 // Patch version component of the current release
VersionMeta = "alpha" // Version metadata to append to the version string
)