Change behavior of WriteStateDiffAt to async (#306)

- returns job ID and provides status update stream
- won't start redundant jobs
This commit is contained in:
Roy Crihfield 2023-01-18 09:40:17 -06:00 committed by GitHub
parent fe49b8a23c
commit ba297e4a35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 8 deletions

View File

@ -141,13 +141,12 @@ func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockN
}
// WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight
func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) error {
func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) JobID {
var err error
start, logger := countApiRequestBegin("writeStateDiffAt", blockNumber)
defer countApiRequestEnd(start, logger, err)
err = api.sds.WriteStateDiffAt(blockNumber, params)
return err
return api.sds.WriteStateDiffAt(blockNumber, params)
}
// WriteStateDiffFor writes a state diff object directly to DB for the specific block hash
@ -164,3 +163,51 @@ func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash
func (api *PublicStateDiffAPI) WatchAddress(operation types.OperationType, args []types.WatchAddressArg) error {
return api.sds.WatchAddress(operation, args)
}
// StreamWrites sets up a subscription that streams the status of completed calls to WriteStateDiff*
func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
// create subscription and start waiting for events
rpcSub := notifier.CreateSubscription()
go func() {
// subscribe to events from the statediff service
statusChan := make(chan JobStatus, chainEventChanSize)
quitChan := make(chan bool, 1)
api.sds.SubscribeWriteStatus(rpcSub.ID, statusChan, quitChan)
var err error
defer func() {
if err != nil {
if err = api.sds.UnsubscribeWriteStatus(rpcSub.ID); err != nil {
log.Error("Failed to unsubscribe from job status stream: " + err.Error())
}
}
}()
// loop and await payloads and relay them to the subscriber with the notifier
for {
select {
case status := <-statusChan:
if err = notifier.Notify(rpcSub.ID, status); err != nil {
log.Error("Failed to send job status; error: " + err.Error())
return
}
case err = <-rpcSub.Err():
if err != nil {
log.Error("State diff service rpcSub error: " + err.Error())
return
}
case <-quitChan:
// don't need to unsubscribe, service does so before sending the quit signal
return
}
}
}()
return rpcSub, nil
}

View File

@ -87,7 +87,7 @@ type IService interface {
APIs() []rpc.API
// Loop is the main event loop for processing state diffs
Loop(chainEventCh chan core.ChainEvent)
// Subscribe method to subscribe to receive state diff processing output`
// Subscribe method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
// Unsubscribe method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) error
@ -100,13 +100,18 @@ type IService interface {
// StreamCodeAndCodeHash method to stream out all code and codehash pairs
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool)
// WriteStateDiffAt method to write state diff object directly to DB
WriteStateDiffAt(blockNumber uint64, params Params) error
WriteStateDiffAt(blockNumber uint64, params Params) JobID
// WriteStateDiffFor method to write state diff object directly to DB
WriteStateDiffFor(blockHash common.Hash, params Params) error
// WriteLoop event loop for progressively processing and writing diffs directly to DB
WriteLoop(chainEventCh chan core.ChainEvent)
// Method to change the addresses being watched in write loop params
WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error
// SubscribeWriteStatus method to subscribe to receive state diff processing output
SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool)
// UnsubscribeWriteStatus method to unsubscribe from state diff processing
UnsubscribeWriteStatus(id rpc.ID) error
}
// Service is the underlying struct for the state diffing service
@ -139,6 +144,27 @@ type Service struct {
numWorkers uint
// Number of retry for aborted transactions due to deadlock.
maxRetry uint
// Write job status subscriptions
jobStatusSubs map[rpc.ID]statusSubscription
// Job ID ticker
lastJobID uint64
// In flight jobs (for WriteStateDiffAt)
currentJobs map[uint64]JobID
currentJobsMutex sync.Mutex
}
// IDs used for tracking in-progress jobs (0 for invalid)
type JobID uint64
// JobStatus represents the status of a completed job
type JobStatus struct {
id JobID
err error
}
type statusSubscription struct {
statusChan chan<- JobStatus
quitChan chan<- bool
}
// BlockCache caches the last block for safe access from different service loops
@ -748,7 +774,27 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
// WriteStateDiffAt writes a state diff at the specific blockheight directly to the database
// This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) JobID {
sds.currentJobsMutex.Lock()
defer sds.currentJobsMutex.Unlock()
if id, has := sds.currentJobs[blockNumber]; has {
return id
}
id := JobID(atomic.AddUint64(&sds.lastJobID, 1))
sds.currentJobs[blockNumber] = id
go func() {
err := sds.writeStateDiffAt(blockNumber, params)
sds.currentJobsMutex.Lock()
delete(sds.currentJobs, blockNumber)
sds.currentJobsMutex.Unlock()
for _, sub := range sds.jobStatusSubs {
sub.statusChan <- JobStatus{id, err}
}
}()
return id
}
func (sds *Service) writeStateDiffAt(blockNumber uint64, params Params) error {
log.Info("writing state diff at", "block height", blockNumber)
// use watched addresses from statediffing write loop if not provided
@ -852,6 +898,33 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
return err
}
// SubscribeWriteStatus is used by the API to subscribe to the job status updates
func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) {
log.Info("Subscribing to job status updates", "subscription id", id)
sds.Lock()
if sds.jobStatusSubs == nil {
sds.jobStatusSubs = map[rpc.ID]statusSubscription{}
}
sds.jobStatusSubs[id] = statusSubscription{
statusChan: sub,
quitChan: quitChan,
}
sds.Unlock()
}
// UnsubscribeWriteStatus is used to unsubscribe from job status updates
func (sds *Service) UnsubscribeWriteStatus(id rpc.ID) error {
log.Info("Unsubscribing from job status updates", "subscription id", id)
sds.Lock()
close(sds.jobStatusSubs[id].quitChan)
delete(sds.jobStatusSubs, id)
if len(sds.jobStatusSubs) == 0 {
sds.jobStatusSubs = nil
}
sds.Unlock()
return nil
}
// WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db:
// add | remove | set | clear
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {

View File

@ -189,9 +189,9 @@ func (sds *MockStateDiffService) newPayload(stateObject []byte, block *types.Blo
}
// WriteStateDiffAt mock method
func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) error {
func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) statediff.JobID {
// TODO: something useful here
return nil
return 0
}
// WriteStateDiffFor mock method
@ -436,3 +436,14 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a
return nil
}
// SubscribeWriteStatus is used by the API to subscribe to the job status updates
func (sds *MockStateDiffService) SubscribeWriteStatus(id rpc.ID, sub chan<- statediff.JobStatus, quitChan chan<- bool) {
// TODO when WriteStateDiff methods are implemented
}
// UnsubscribeWriteStatus is used to unsubscribe from job status updates
func (sds *MockStateDiffService) UnsubscribeWriteStatus(id rpc.ID) error {
// TODO when WriteStateDiff methods are implemented
return nil
}