diff --git a/cmd/geth/config.go b/cmd/geth/config.go index ae5332e30..c60b09efe 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -265,9 +265,6 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { if ctx.IsSet(utils.StateDiffDBConnTimeout.Name) { pgConfig.ConnTimeout = time.Duration(ctx.Duration(utils.StateDiffDBConnTimeout.Name).Seconds()) } - if ctx.IsSet(utils.StateDiffLogStatements.Name) { - pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name) - } indexerConfig = pgConfig case shared.DUMP: dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 16c549fed..696b7be26 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -179,7 +179,6 @@ var ( utils.StateDiffWaitForSync, utils.StateDiffWatchedAddressesFilePath, utils.StateDiffUpsert, - utils.StateDiffLogStatements, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 9f2624d3c..070c00409 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1078,11 +1078,6 @@ var ( Usage: "Should the statediff service overwrite data existing in the database?", Value: false, } - StateDiffLogStatements = &cli.BoolFlag{ - Name: "statediff.db.logstatements", - Usage: "Should the statediff service log all database statements? (Note: pgx only)", - Value: false, - } StateDiffWritingFlag = &cli.BoolFlag{ Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", diff --git a/statediff/api.go b/statediff/api.go index 55b9b1f9a..09622d20b 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -141,13 +141,12 @@ func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockN } // WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight -func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) error { +func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) JobID { var err error start, logger := countApiRequestBegin("writeStateDiffAt", blockNumber) defer countApiRequestEnd(start, logger, err) - err = api.sds.WriteStateDiffAt(blockNumber, params) - return err + return api.sds.WriteStateDiffAt(blockNumber, params) } // WriteStateDiffFor writes a state diff object directly to DB for the specific block hash @@ -164,3 +163,51 @@ func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash func (api *PublicStateDiffAPI) WatchAddress(operation types.OperationType, args []types.WatchAddressArg) error { return api.sds.WatchAddress(operation, args) } + +// StreamWrites sets up a subscription that streams the status of completed calls to WriteStateDiff* +func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscription, error) { + // ensure that the RPC connection supports subscriptions + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + // create subscription and start waiting for events + rpcSub := notifier.CreateSubscription() + + go func() { + // subscribe to events from the statediff service + statusChan := make(chan JobStatus, chainEventChanSize) + quitChan := make(chan bool, 1) + api.sds.SubscribeWriteStatus(rpcSub.ID, statusChan, quitChan) + + var err error + defer func() { + if err != nil { + if err = api.sds.UnsubscribeWriteStatus(rpcSub.ID); err != nil { + log.Error("Failed to unsubscribe from job status stream: " + err.Error()) + } + } + }() + // loop and await payloads and relay them to the subscriber with the notifier + for { + select { + case status := <-statusChan: + if err = notifier.Notify(rpcSub.ID, status); err != nil { + log.Error("Failed to send job status; error: " + err.Error()) + return + } + case err = <-rpcSub.Err(): + if err != nil { + log.Error("State diff service rpcSub error: " + err.Error()) + return + } + case <-quitChan: + // don't need to unsubscribe, service does so before sending the quit signal + return + } + } + }() + + return rpcSub, nil +} diff --git a/statediff/builder.go b/statediff/builder.go index 58cba37f4..c3ecbf117 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -727,7 +727,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, new if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) { return nil } - log.Trace("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex()) + log.Debug("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex()) oldTrie, err := sdb.StateCache.OpenTrie(oldSR) if err != nil { return err diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index b5cdc02ab..07c426811 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -70,7 +70,6 @@ type Config struct { MaxConnIdleTime time.Duration MaxConnLifetime time.Duration ConnTimeout time.Duration - LogStatements bool // node info params ID string diff --git a/statediff/indexer/database/sql/postgres/log_adapter.go b/statediff/indexer/database/sql/postgres/log_adapter.go deleted file mode 100644 index c3ceead46..000000000 --- a/statediff/indexer/database/sql/postgres/log_adapter.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright © 2023 Cerc - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package postgres - -import ( - "context" - - "github.com/ethereum/go-ethereum/log" - "github.com/jackc/pgx/v4" -) - -type LogAdapter struct { - l log.Logger -} - -func NewLogAdapter(l log.Logger) *LogAdapter { - return &LogAdapter{l: l} -} - -func (l *LogAdapter) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) { - var logger log.Logger - if data != nil { - var args = make([]interface{}, 0) - for key, value := range data { - if value != nil { - args = append(args, key, value) - } - } - logger = l.l.New(args...) - } else { - logger = l.l - } - - switch level { - case pgx.LogLevelTrace: - logger.Trace(msg) - case pgx.LogLevelDebug: - logger.Debug(msg) - case pgx.LogLevelInfo: - logger.Info(msg) - case pgx.LogLevelWarn: - logger.Warn(msg) - case pgx.LogLevelError: - logger.Error(msg) - default: - logger.New("INVALID_PGX_LOG_LEVEL", level).Error(msg) - } -} diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 9f1c4d571..936a3765d 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -20,8 +20,6 @@ import ( "context" "time" - "github.com/ethereum/go-ethereum/log" - "github.com/georgysavva/scany/pgxscan" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" @@ -87,11 +85,6 @@ func MakeConfig(config Config) (*pgxpool.Config, error) { if config.MaxConnIdleTime != 0 { conf.MaxConnIdleTime = config.MaxConnIdleTime } - - if config.LogStatements { - conf.ConnConfig.Logger = NewLogAdapter(log.New()) - } - return conf, nil } diff --git a/statediff/service.go b/statediff/service.go index a147ca1c9..b5edd19fe 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -87,7 +87,7 @@ type IService interface { APIs() []rpc.API // Loop is the main event loop for processing state diffs Loop(chainEventCh chan core.ChainEvent) - // Subscribe method to subscribe to receive state diff processing output` + // Subscribe method to subscribe to receive state diff processing output Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params) // Unsubscribe method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error @@ -100,13 +100,18 @@ type IService interface { // StreamCodeAndCodeHash method to stream out all code and codehash pairs StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) // WriteStateDiffAt method to write state diff object directly to DB - WriteStateDiffAt(blockNumber uint64, params Params) error + WriteStateDiffAt(blockNumber uint64, params Params) JobID // WriteStateDiffFor method to write state diff object directly to DB WriteStateDiffFor(blockHash common.Hash, params Params) error // WriteLoop event loop for progressively processing and writing diffs directly to DB WriteLoop(chainEventCh chan core.ChainEvent) // Method to change the addresses being watched in write loop params WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error + + // SubscribeWriteStatus method to subscribe to receive state diff processing output + SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) + // UnsubscribeWriteStatus method to unsubscribe from state diff processing + UnsubscribeWriteStatus(id rpc.ID) error } // Service is the underlying struct for the state diffing service @@ -139,6 +144,27 @@ type Service struct { numWorkers uint // Number of retry for aborted transactions due to deadlock. maxRetry uint + // Write job status subscriptions + jobStatusSubs map[rpc.ID]statusSubscription + // Job ID ticker + lastJobID uint64 + // In flight jobs (for WriteStateDiffAt) + currentJobs map[uint64]JobID + currentJobsMutex sync.Mutex +} + +// IDs used for tracking in-progress jobs (0 for invalid) +type JobID uint64 + +// JobStatus represents the status of a completed job +type JobStatus struct { + id JobID + err error +} + +type statusSubscription struct { + statusChan chan<- JobStatus + quitChan chan<- bool } // BlockCache caches the last block for safe access from different service loops @@ -748,7 +774,27 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ // WriteStateDiffAt writes a state diff at the specific blockheight directly to the database // This operation cannot be performed back past the point of db pruning; it requires an archival node // for historical data -func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { +func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) JobID { + sds.currentJobsMutex.Lock() + defer sds.currentJobsMutex.Unlock() + if id, has := sds.currentJobs[blockNumber]; has { + return id + } + id := JobID(atomic.AddUint64(&sds.lastJobID, 1)) + sds.currentJobs[blockNumber] = id + go func() { + err := sds.writeStateDiffAt(blockNumber, params) + sds.currentJobsMutex.Lock() + delete(sds.currentJobs, blockNumber) + sds.currentJobsMutex.Unlock() + for _, sub := range sds.jobStatusSubs { + sub.statusChan <- JobStatus{id, err} + } + }() + return id +} + +func (sds *Service) writeStateDiffAt(blockNumber uint64, params Params) error { log.Info("writing state diff at", "block height", blockNumber) // use watched addresses from statediffing write loop if not provided @@ -852,6 +898,33 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo return err } +// SubscribeWriteStatus is used by the API to subscribe to the job status updates +func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) { + log.Info("Subscribing to job status updates", "subscription id", id) + sds.Lock() + if sds.jobStatusSubs == nil { + sds.jobStatusSubs = map[rpc.ID]statusSubscription{} + } + sds.jobStatusSubs[id] = statusSubscription{ + statusChan: sub, + quitChan: quitChan, + } + sds.Unlock() +} + +// UnsubscribeWriteStatus is used to unsubscribe from job status updates +func (sds *Service) UnsubscribeWriteStatus(id rpc.ID) error { + log.Info("Unsubscribing from job status updates", "subscription id", id) + sds.Lock() + close(sds.jobStatusSubs[id].quitChan) + delete(sds.jobStatusSubs, id) + if len(sds.jobStatusSubs) == 0 { + sds.jobStatusSubs = nil + } + sds.Unlock() + return nil +} + // WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db: // add | remove | set | clear func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error { diff --git a/statediff/test_helpers/mocks/service.go b/statediff/test_helpers/mocks/service.go index ab183fddd..1ecd80ec8 100644 --- a/statediff/test_helpers/mocks/service.go +++ b/statediff/test_helpers/mocks/service.go @@ -189,9 +189,9 @@ func (sds *MockStateDiffService) newPayload(stateObject []byte, block *types.Blo } // WriteStateDiffAt mock method -func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) error { +func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) statediff.JobID { // TODO: something useful here - return nil + return 0 } // WriteStateDiffFor mock method @@ -436,3 +436,14 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a return nil } + +// SubscribeWriteStatus is used by the API to subscribe to the job status updates +func (sds *MockStateDiffService) SubscribeWriteStatus(id rpc.ID, sub chan<- statediff.JobStatus, quitChan chan<- bool) { + // TODO when WriteStateDiff methods are implemented +} + +// UnsubscribeWriteStatus is used to unsubscribe from job status updates +func (sds *MockStateDiffService) UnsubscribeWriteStatus(id rpc.ID) error { + // TODO when WriteStateDiff methods are implemented + return nil +}