From ba297e4a35a43d36377a41dfab92e82f49cc418d Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Wed, 18 Jan 2023 09:40:17 -0600 Subject: [PATCH] Change behavior of WriteStateDiffAt to async (#306) - returns job ID and provides status update stream - won't start redundant jobs --- statediff/api.go | 53 ++++++++++++++++- statediff/service.go | 79 ++++++++++++++++++++++++- statediff/test_helpers/mocks/service.go | 15 ++++- 3 files changed, 139 insertions(+), 8 deletions(-) diff --git a/statediff/api.go b/statediff/api.go index 55b9b1f9a..09622d20b 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -141,13 +141,12 @@ func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockN } // WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight -func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) error { +func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) JobID { var err error start, logger := countApiRequestBegin("writeStateDiffAt", blockNumber) defer countApiRequestEnd(start, logger, err) - err = api.sds.WriteStateDiffAt(blockNumber, params) - return err + return api.sds.WriteStateDiffAt(blockNumber, params) } // WriteStateDiffFor writes a state diff object directly to DB for the specific block hash @@ -164,3 +163,51 @@ func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash func (api *PublicStateDiffAPI) WatchAddress(operation types.OperationType, args []types.WatchAddressArg) error { return api.sds.WatchAddress(operation, args) } + +// StreamWrites sets up a subscription that streams the status of completed calls to WriteStateDiff* +func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscription, error) { + // ensure that the RPC connection supports subscriptions + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + // create subscription and start waiting for events + rpcSub := notifier.CreateSubscription() + + go func() { + // subscribe to events from the statediff service + statusChan := make(chan JobStatus, chainEventChanSize) + quitChan := make(chan bool, 1) + api.sds.SubscribeWriteStatus(rpcSub.ID, statusChan, quitChan) + + var err error + defer func() { + if err != nil { + if err = api.sds.UnsubscribeWriteStatus(rpcSub.ID); err != nil { + log.Error("Failed to unsubscribe from job status stream: " + err.Error()) + } + } + }() + // loop and await payloads and relay them to the subscriber with the notifier + for { + select { + case status := <-statusChan: + if err = notifier.Notify(rpcSub.ID, status); err != nil { + log.Error("Failed to send job status; error: " + err.Error()) + return + } + case err = <-rpcSub.Err(): + if err != nil { + log.Error("State diff service rpcSub error: " + err.Error()) + return + } + case <-quitChan: + // don't need to unsubscribe, service does so before sending the quit signal + return + } + } + }() + + return rpcSub, nil +} diff --git a/statediff/service.go b/statediff/service.go index a147ca1c9..b5edd19fe 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -87,7 +87,7 @@ type IService interface { APIs() []rpc.API // Loop is the main event loop for processing state diffs Loop(chainEventCh chan core.ChainEvent) - // Subscribe method to subscribe to receive state diff processing output` + // Subscribe method to subscribe to receive state diff processing output Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params) // Unsubscribe method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error @@ -100,13 +100,18 @@ type IService interface { // StreamCodeAndCodeHash method to stream out all code and codehash pairs StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) // WriteStateDiffAt method to write state diff object directly to DB - WriteStateDiffAt(blockNumber uint64, params Params) error + WriteStateDiffAt(blockNumber uint64, params Params) JobID // WriteStateDiffFor method to write state diff object directly to DB WriteStateDiffFor(blockHash common.Hash, params Params) error // WriteLoop event loop for progressively processing and writing diffs directly to DB WriteLoop(chainEventCh chan core.ChainEvent) // Method to change the addresses being watched in write loop params WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error + + // SubscribeWriteStatus method to subscribe to receive state diff processing output + SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) + // UnsubscribeWriteStatus method to unsubscribe from state diff processing + UnsubscribeWriteStatus(id rpc.ID) error } // Service is the underlying struct for the state diffing service @@ -139,6 +144,27 @@ type Service struct { numWorkers uint // Number of retry for aborted transactions due to deadlock. maxRetry uint + // Write job status subscriptions + jobStatusSubs map[rpc.ID]statusSubscription + // Job ID ticker + lastJobID uint64 + // In flight jobs (for WriteStateDiffAt) + currentJobs map[uint64]JobID + currentJobsMutex sync.Mutex +} + +// IDs used for tracking in-progress jobs (0 for invalid) +type JobID uint64 + +// JobStatus represents the status of a completed job +type JobStatus struct { + id JobID + err error +} + +type statusSubscription struct { + statusChan chan<- JobStatus + quitChan chan<- bool } // BlockCache caches the last block for safe access from different service loops @@ -748,7 +774,27 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ // WriteStateDiffAt writes a state diff at the specific blockheight directly to the database // This operation cannot be performed back past the point of db pruning; it requires an archival node // for historical data -func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { +func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) JobID { + sds.currentJobsMutex.Lock() + defer sds.currentJobsMutex.Unlock() + if id, has := sds.currentJobs[blockNumber]; has { + return id + } + id := JobID(atomic.AddUint64(&sds.lastJobID, 1)) + sds.currentJobs[blockNumber] = id + go func() { + err := sds.writeStateDiffAt(blockNumber, params) + sds.currentJobsMutex.Lock() + delete(sds.currentJobs, blockNumber) + sds.currentJobsMutex.Unlock() + for _, sub := range sds.jobStatusSubs { + sub.statusChan <- JobStatus{id, err} + } + }() + return id +} + +func (sds *Service) writeStateDiffAt(blockNumber uint64, params Params) error { log.Info("writing state diff at", "block height", blockNumber) // use watched addresses from statediffing write loop if not provided @@ -852,6 +898,33 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo return err } +// SubscribeWriteStatus is used by the API to subscribe to the job status updates +func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) { + log.Info("Subscribing to job status updates", "subscription id", id) + sds.Lock() + if sds.jobStatusSubs == nil { + sds.jobStatusSubs = map[rpc.ID]statusSubscription{} + } + sds.jobStatusSubs[id] = statusSubscription{ + statusChan: sub, + quitChan: quitChan, + } + sds.Unlock() +} + +// UnsubscribeWriteStatus is used to unsubscribe from job status updates +func (sds *Service) UnsubscribeWriteStatus(id rpc.ID) error { + log.Info("Unsubscribing from job status updates", "subscription id", id) + sds.Lock() + close(sds.jobStatusSubs[id].quitChan) + delete(sds.jobStatusSubs, id) + if len(sds.jobStatusSubs) == 0 { + sds.jobStatusSubs = nil + } + sds.Unlock() + return nil +} + // WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db: // add | remove | set | clear func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error { diff --git a/statediff/test_helpers/mocks/service.go b/statediff/test_helpers/mocks/service.go index ab183fddd..1ecd80ec8 100644 --- a/statediff/test_helpers/mocks/service.go +++ b/statediff/test_helpers/mocks/service.go @@ -189,9 +189,9 @@ func (sds *MockStateDiffService) newPayload(stateObject []byte, block *types.Blo } // WriteStateDiffAt mock method -func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) error { +func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) statediff.JobID { // TODO: something useful here - return nil + return 0 } // WriteStateDiffFor mock method @@ -436,3 +436,14 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a return nil } + +// SubscribeWriteStatus is used by the API to subscribe to the job status updates +func (sds *MockStateDiffService) SubscribeWriteStatus(id rpc.ID, sub chan<- statediff.JobStatus, quitChan chan<- bool) { + // TODO when WriteStateDiff methods are implemented +} + +// UnsubscribeWriteStatus is used to unsubscribe from job status updates +func (sds *MockStateDiffService) UnsubscribeWriteStatus(id rpc.ID) error { + // TODO when WriteStateDiff methods are implemented + return nil +}