Updates as Per Ian's comments
1. Fix some naming inconsistencies. 2. `GetSyncStatus` returns a bool 3. `Start` handles any `errors` from the `WaitingForSync` function. 4. Add a sleep for the `while` loop. 5. Fix test based on a conditional that would never be met.
This commit is contained in:
parent
c5a36978f8
commit
333dd3ff4f
@ -81,24 +81,43 @@ This state diffing service runs as an auxiliary service concurrent to the regula
|
|||||||
This service introduces a CLI flag namespace `statediff`
|
This service introduces a CLI flag namespace `statediff`
|
||||||
|
|
||||||
`--statediff` flag is used to turn on the service
|
`--statediff` flag is used to turn on the service
|
||||||
|
|
||||||
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
|
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
|
||||||
|
|
||||||
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
|
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
|
||||||
|
|
||||||
`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file)
|
`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file)
|
||||||
|
|
||||||
`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)
|
`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)
|
||||||
|
|
||||||
`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)
|
`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)
|
||||||
|
|
||||||
`--statediff.db.host` is the hostname/ip to dial to connect to the database
|
`--statediff.db.host` is the hostname/ip to dial to connect to the database
|
||||||
|
|
||||||
`--statediff.db.port` is the port to dial to connect to the database
|
`--statediff.db.port` is the port to dial to connect to the database
|
||||||
|
|
||||||
`--statediff.db.name` is the name of the database to connect to
|
`--statediff.db.name` is the name of the database to connect to
|
||||||
|
|
||||||
`--statediff.db.user` is the user to connect to the database as
|
`--statediff.db.user` is the user to connect to the database as
|
||||||
|
|
||||||
`--statediff.db.password` is the password to use to connect to the database
|
`--statediff.db.password` is the password to use to connect to the database
|
||||||
|
|
||||||
`--statediff.db.conntimeout` is the connection timeout (in seconds)
|
`--statediff.db.conntimeout` is the connection timeout (in seconds)
|
||||||
|
|
||||||
`--statediff.db.maxconns` is the maximum number of database connections
|
`--statediff.db.maxconns` is the maximum number of database connections
|
||||||
|
|
||||||
`--statediff.db.minconns` is the minimum number of database connections
|
`--statediff.db.minconns` is the minimum number of database connections
|
||||||
|
|
||||||
`--statediff.db.maxidleconns` is the maximum number of idle connections
|
`--statediff.db.maxidleconns` is the maximum number of idle connections
|
||||||
|
|
||||||
`--statediff.db.maxconnidletime` is the maximum lifetime for an idle connection (in seconds)
|
`--statediff.db.maxconnidletime` is the maximum lifetime for an idle connection (in seconds)
|
||||||
|
|
||||||
`--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds)
|
`--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds)
|
||||||
|
|
||||||
`--statediff.db.nodeid` is the node id to use in the Postgres database
|
`--statediff.db.nodeid` is the node id to use in the Postgres database
|
||||||
|
|
||||||
`--statediff.db.clientname` is the client name to use in the Postgres database
|
`--statediff.db.clientname` is the client name to use in the Postgres database
|
||||||
|
|
||||||
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
|
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
|
||||||
|
|
||||||
The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`)
|
The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`)
|
||||||
|
@ -122,7 +122,7 @@ type Service struct {
|
|||||||
// The publicBackendAPI which provides useful information about the current state
|
// The publicBackendAPI which provides useful information about the current state
|
||||||
BackendAPI ethapi.Backend
|
BackendAPI ethapi.Backend
|
||||||
// Should the statediff service wait for geth to sync to head?
|
// Should the statediff service wait for geth to sync to head?
|
||||||
WaitforSync bool
|
WaitForSync bool
|
||||||
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
||||||
subscribers int32
|
subscribers int32
|
||||||
// Interface for publishing statediffs as PG-IPLD objects
|
// Interface for publishing statediffs as PG-IPLD objects
|
||||||
@ -183,7 +183,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
SubscriptionTypes: make(map[common.Hash]Params),
|
SubscriptionTypes: make(map[common.Hash]Params),
|
||||||
BlockCache: NewBlockCache(workers),
|
BlockCache: NewBlockCache(workers),
|
||||||
BackendAPI: backend,
|
BackendAPI: backend,
|
||||||
WaitforSync: params.WaitForSync,
|
WaitForSync: params.WaitForSync,
|
||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: params.EnableWriteLoop,
|
enableWriteLoop: params.EnableWriteLoop,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
@ -537,19 +537,23 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
|
|||||||
|
|
||||||
// This function will check the status of geth syncing.
|
// This function will check the status of geth syncing.
|
||||||
// It will return false if geth has finished syncing.
|
// It will return false if geth has finished syncing.
|
||||||
// It will return a non false value if geth is not done syncing.
|
// It will return a true Geth is still syncing.
|
||||||
func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.PublicEthereumAPI) (interface{}, error) {
|
func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.PublicEthereumAPI) (bool, error) {
|
||||||
syncStatus, err := pubEthAPI.Syncing()
|
syncStatus, err := pubEthAPI.Syncing()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return true, err
|
||||||
}
|
}
|
||||||
return syncStatus, err
|
|
||||||
|
if syncStatus != false {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function calls GetSyncStatus to check if we have caught up to head.
|
// This function calls GetSyncStatus to check if we have caught up to head.
|
||||||
// It will keep looking and checking if we have caught up to head.
|
// It will keep looking and checking if we have caught up to head.
|
||||||
// It will only complete if we catch up to head, otherwise it will keep looping forever.
|
// It will only complete if we catch up to head, otherwise it will keep looping forever.
|
||||||
func (sds *Service) WaitForSync() error {
|
func (sds *Service) WaitingForSync() error {
|
||||||
log.Info("We are going to wait for geth to sync to head!")
|
log.Info("We are going to wait for geth to sync to head!")
|
||||||
|
|
||||||
// Has the geth node synced to head?
|
// Has the geth node synced to head?
|
||||||
@ -560,9 +564,11 @@ func (sds *Service) WaitForSync() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if syncStatus == false {
|
if !syncStatus {
|
||||||
log.Info("Geth has caught up to the head of the chain")
|
log.Info("Geth has caught up to the head of the chain")
|
||||||
Synced = true
|
Synced = true
|
||||||
|
} else {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -572,9 +578,12 @@ func (sds *Service) WaitForSync() error {
|
|||||||
func (sds *Service) Start() error {
|
func (sds *Service) Start() error {
|
||||||
log.Info("Starting statediff service")
|
log.Info("Starting statediff service")
|
||||||
|
|
||||||
if sds.WaitforSync {
|
if sds.WaitForSync {
|
||||||
log.Info("Statediff service will wait until geth has caught up to the head of the chain.")
|
log.Info("Statediff service will wait until geth has caught up to the head of the chain.")
|
||||||
sds.WaitForSync()
|
err := sds.WaitingForSync()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
log.Info("Continuing with startdiff start process")
|
log.Info("Continuing with startdiff start process")
|
||||||
}
|
}
|
||||||
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
|
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
|
||||||
|
@ -328,7 +328,7 @@ func createServiceWithMockBackend(curBlock uint64, highestBlock uint64) (*mocks.
|
|||||||
SubscriptionTypes: make(map[common.Hash]statediff.Params),
|
SubscriptionTypes: make(map[common.Hash]statediff.Params),
|
||||||
BlockCache: statediff.NewBlockCache(1),
|
BlockCache: statediff.NewBlockCache(1),
|
||||||
BackendAPI: &backend,
|
BackendAPI: &backend,
|
||||||
WaitforSync: true,
|
WaitForSync: true,
|
||||||
}
|
}
|
||||||
return &backend, service
|
return &backend, service
|
||||||
}
|
}
|
||||||
@ -338,7 +338,7 @@ func createServiceWithMockBackend(curBlock uint64, highestBlock uint64) (*mocks.
|
|||||||
func testWaitForSync(t *testing.T) {
|
func testWaitForSync(t *testing.T) {
|
||||||
t.Log("Starting Sync")
|
t.Log("Starting Sync")
|
||||||
_, service := createServiceWithMockBackend(10, 10)
|
_, service := createServiceWithMockBackend(10, 10)
|
||||||
err := service.WaitForSync()
|
err := service.WaitingForSync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Sync Failed")
|
t.Fatal("Sync Failed")
|
||||||
}
|
}
|
||||||
@ -362,7 +362,7 @@ func testGetSyncStatus(t *testing.T) {
|
|||||||
// Start the sync function which will wait for the sync
|
// Start the sync function which will wait for the sync
|
||||||
// Once the sync is complete add a value to the checkSyncComplet channel
|
// Once the sync is complete add a value to the checkSyncComplet channel
|
||||||
t.Log("Starting Sync")
|
t.Log("Starting Sync")
|
||||||
err := service.WaitForSync()
|
err := service.WaitingForSync()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error("Sync Failed")
|
t.Error("Sync Failed")
|
||||||
checkSyncComplete <- 1
|
checkSyncComplete <- 1
|
||||||
@ -400,31 +400,28 @@ func testGetSyncStatus(t *testing.T) {
|
|||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
// Make sure if syncStatus is false that WaitForSync has completed!
|
// Make sure if syncStatus is false that WaitForSync has completed!
|
||||||
if syncStatus == false && len(checkSyncComplete) == 0 {
|
if !syncStatus && len(checkSyncComplete) == 0 {
|
||||||
t.Error("Sync is complete but WaitForSync is not")
|
t.Error("Sync is complete but WaitForSync is not")
|
||||||
}
|
}
|
||||||
|
|
||||||
if syncStatus != false && len(checkSyncComplete) == 1 {
|
if syncStatus && len(checkSyncComplete) == 1 {
|
||||||
t.Error("Sync is not complete but WaitForSync is")
|
t.Error("Sync is not complete but WaitForSync is")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure sync hasn't completed and that the checkSyncComplete channel is empty
|
// Make sure sync hasn't completed and that the checkSyncComplete channel is empty
|
||||||
if syncStatus != false && len(checkSyncComplete) == 0 {
|
if syncStatus && len(checkSyncComplete) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// This code will only be run if the sync is complete and the WaitForSync function is complete
|
// This code will only be run if the sync is complete and the WaitForSync function is complete
|
||||||
//t.Log("Backend: ", backend)
|
|
||||||
//t.Log("Sync Status: ", syncStatus)
|
|
||||||
|
|
||||||
// If syncstatus is complete, make sure that the blocks match
|
// If syncstatus is complete, make sure that the blocks match
|
||||||
if syncStatus == false && table.currentBlock != table.highestBlock {
|
if !syncStatus && table.currentBlock != table.highestBlock {
|
||||||
t.Errorf("syncStatus indicated sync was complete even when current block, %d, and highest block %d aren't equal",
|
t.Errorf("syncStatus indicated sync was complete even when current block, %d, and highest block %d aren't equal",
|
||||||
table.currentBlock, table.highestBlock)
|
table.currentBlock, table.highestBlock)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure that WaitForSync completed once the current block caught up to head!
|
// Make sure that WaitForSync completed once the current block caught up to head!
|
||||||
if len(checkSyncComplete) == 1 {
|
|
||||||
checkSyncCompleteVal := <-checkSyncComplete
|
checkSyncCompleteVal := <-checkSyncComplete
|
||||||
if checkSyncCompleteVal != 0 {
|
if checkSyncCompleteVal != 0 {
|
||||||
t.Errorf("syncStatus indicated sync was complete but the checkSyncComplete has a value of %d",
|
t.Errorf("syncStatus indicated sync was complete but the checkSyncComplete has a value of %d",
|
||||||
@ -433,9 +430,6 @@ func testGetSyncStatus(t *testing.T) {
|
|||||||
t.Log("Test Passed!")
|
t.Log("Test Passed!")
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
|
||||||
t.Error("checkSyncComplete is empty: ", len(checkSyncComplete))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user