From 339d7bfe7e503f31cd7f45fe55ffdee876cf6442 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Fri, 13 Jan 2023 14:59:18 -0600 Subject: [PATCH 1/4] Add --statediff.db.logstatements option. (#307) * Add --statediff.db.logstatements option. --- cmd/geth/config.go | 3 + cmd/geth/main.go | 1 + cmd/utils/flags.go | 5 ++ .../indexer/database/sql/postgres/config.go | 1 + .../database/sql/postgres/log_adapter.go | 61 +++++++++++++++++++ .../indexer/database/sql/postgres/pgx.go | 7 +++ 6 files changed, 78 insertions(+) create mode 100644 statediff/indexer/database/sql/postgres/log_adapter.go diff --git a/cmd/geth/config.go b/cmd/geth/config.go index c60b09efe..ae5332e30 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -265,6 +265,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { if ctx.IsSet(utils.StateDiffDBConnTimeout.Name) { pgConfig.ConnTimeout = time.Duration(ctx.Duration(utils.StateDiffDBConnTimeout.Name).Seconds()) } + if ctx.IsSet(utils.StateDiffLogStatements.Name) { + pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name) + } indexerConfig = pgConfig case shared.DUMP: dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 696b7be26..16c549fed 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -179,6 +179,7 @@ var ( utils.StateDiffWaitForSync, utils.StateDiffWatchedAddressesFilePath, utils.StateDiffUpsert, + utils.StateDiffLogStatements, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 070c00409..2bdb3f8e1 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1078,6 +1078,11 @@ var ( Usage: "Should the statediff service overwrite data existing in the database?", Value: false, } + StateDiffLogStatements = &cli.BoolFlag{ + Name: "statediff.db.logstatements", + Usage: "Should the statediff service log all database statements?", + Value: false, + } StateDiffWritingFlag = &cli.BoolFlag{ Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 07c426811..b5cdc02ab 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -70,6 +70,7 @@ type Config struct { MaxConnIdleTime time.Duration MaxConnLifetime time.Duration ConnTimeout time.Duration + LogStatements bool // node info params ID string diff --git a/statediff/indexer/database/sql/postgres/log_adapter.go b/statediff/indexer/database/sql/postgres/log_adapter.go new file mode 100644 index 000000000..c3ceead46 --- /dev/null +++ b/statediff/indexer/database/sql/postgres/log_adapter.go @@ -0,0 +1,61 @@ +// Copyright © 2023 Cerc + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import ( + "context" + + "github.com/ethereum/go-ethereum/log" + "github.com/jackc/pgx/v4" +) + +type LogAdapter struct { + l log.Logger +} + +func NewLogAdapter(l log.Logger) *LogAdapter { + return &LogAdapter{l: l} +} + +func (l *LogAdapter) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) { + var logger log.Logger + if data != nil { + var args = make([]interface{}, 0) + for key, value := range data { + if value != nil { + args = append(args, key, value) + } + } + logger = l.l.New(args...) + } else { + logger = l.l + } + + switch level { + case pgx.LogLevelTrace: + logger.Trace(msg) + case pgx.LogLevelDebug: + logger.Debug(msg) + case pgx.LogLevelInfo: + logger.Info(msg) + case pgx.LogLevelWarn: + logger.Warn(msg) + case pgx.LogLevelError: + logger.Error(msg) + default: + logger.New("INVALID_PGX_LOG_LEVEL", level).Error(msg) + } +} diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 936a3765d..9f1c4d571 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -20,6 +20,8 @@ import ( "context" "time" + "github.com/ethereum/go-ethereum/log" + "github.com/georgysavva/scany/pgxscan" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" @@ -85,6 +87,11 @@ func MakeConfig(config Config) (*pgxpool.Config, error) { if config.MaxConnIdleTime != 0 { conf.MaxConnIdleTime = config.MaxConnIdleTime } + + if config.LogStatements { + conf.ConnConfig.Logger = NewLogAdapter(log.New()) + } + return conf, nil } -- 2.45.2 From 907bf3d2f18b8b6454479e13adc4472d74e4d8da Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Fri, 13 Jan 2023 15:37:04 -0600 Subject: [PATCH 2/4] Move message from Debug to Trace. (#308) --- statediff/builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statediff/builder.go b/statediff/builder.go index c3ecbf117..58cba37f4 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -727,7 +727,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, new if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) { return nil } - log.Debug("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex()) + log.Trace("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex()) oldTrie, err := sdb.StateCache.OpenTrie(oldSR) if err != nil { return err -- 2.45.2 From fe49b8a23ce3a20f6758d770f5bc4388391e9bd3 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Fri, 13 Jan 2023 16:32:45 -0600 Subject: [PATCH 3/4] Add driver detail to flag. (#310) --- cmd/utils/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 2bdb3f8e1..9f2624d3c 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1080,7 +1080,7 @@ var ( } StateDiffLogStatements = &cli.BoolFlag{ Name: "statediff.db.logstatements", - Usage: "Should the statediff service log all database statements?", + Usage: "Should the statediff service log all database statements? (Note: pgx only)", Value: false, } StateDiffWritingFlag = &cli.BoolFlag{ -- 2.45.2 From 0760de23f6b15cf78a8d076dfc3bad1a3afde390 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 12 Jan 2023 21:41:34 -0600 Subject: [PATCH 4/4] Change behavior of WriteStateDiffAt to async - returns job ID and provides status update stream - won't start redundant jobs --- statediff/api.go | 53 ++++++++++++++++- statediff/service.go | 79 ++++++++++++++++++++++++- statediff/test_helpers/mocks/service.go | 15 ++++- 3 files changed, 139 insertions(+), 8 deletions(-) diff --git a/statediff/api.go b/statediff/api.go index 55b9b1f9a..09622d20b 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -141,13 +141,12 @@ func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockN } // WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight -func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) error { +func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) JobID { var err error start, logger := countApiRequestBegin("writeStateDiffAt", blockNumber) defer countApiRequestEnd(start, logger, err) - err = api.sds.WriteStateDiffAt(blockNumber, params) - return err + return api.sds.WriteStateDiffAt(blockNumber, params) } // WriteStateDiffFor writes a state diff object directly to DB for the specific block hash @@ -164,3 +163,51 @@ func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash func (api *PublicStateDiffAPI) WatchAddress(operation types.OperationType, args []types.WatchAddressArg) error { return api.sds.WatchAddress(operation, args) } + +// StreamWrites sets up a subscription that streams the status of completed calls to WriteStateDiff* +func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscription, error) { + // ensure that the RPC connection supports subscriptions + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + // create subscription and start waiting for events + rpcSub := notifier.CreateSubscription() + + go func() { + // subscribe to events from the statediff service + statusChan := make(chan JobStatus, chainEventChanSize) + quitChan := make(chan bool, 1) + api.sds.SubscribeWriteStatus(rpcSub.ID, statusChan, quitChan) + + var err error + defer func() { + if err != nil { + if err = api.sds.UnsubscribeWriteStatus(rpcSub.ID); err != nil { + log.Error("Failed to unsubscribe from job status stream: " + err.Error()) + } + } + }() + // loop and await payloads and relay them to the subscriber with the notifier + for { + select { + case status := <-statusChan: + if err = notifier.Notify(rpcSub.ID, status); err != nil { + log.Error("Failed to send job status; error: " + err.Error()) + return + } + case err = <-rpcSub.Err(): + if err != nil { + log.Error("State diff service rpcSub error: " + err.Error()) + return + } + case <-quitChan: + // don't need to unsubscribe, service does so before sending the quit signal + return + } + } + }() + + return rpcSub, nil +} diff --git a/statediff/service.go b/statediff/service.go index a147ca1c9..b5edd19fe 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -87,7 +87,7 @@ type IService interface { APIs() []rpc.API // Loop is the main event loop for processing state diffs Loop(chainEventCh chan core.ChainEvent) - // Subscribe method to subscribe to receive state diff processing output` + // Subscribe method to subscribe to receive state diff processing output Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params) // Unsubscribe method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error @@ -100,13 +100,18 @@ type IService interface { // StreamCodeAndCodeHash method to stream out all code and codehash pairs StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) // WriteStateDiffAt method to write state diff object directly to DB - WriteStateDiffAt(blockNumber uint64, params Params) error + WriteStateDiffAt(blockNumber uint64, params Params) JobID // WriteStateDiffFor method to write state diff object directly to DB WriteStateDiffFor(blockHash common.Hash, params Params) error // WriteLoop event loop for progressively processing and writing diffs directly to DB WriteLoop(chainEventCh chan core.ChainEvent) // Method to change the addresses being watched in write loop params WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error + + // SubscribeWriteStatus method to subscribe to receive state diff processing output + SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) + // UnsubscribeWriteStatus method to unsubscribe from state diff processing + UnsubscribeWriteStatus(id rpc.ID) error } // Service is the underlying struct for the state diffing service @@ -139,6 +144,27 @@ type Service struct { numWorkers uint // Number of retry for aborted transactions due to deadlock. maxRetry uint + // Write job status subscriptions + jobStatusSubs map[rpc.ID]statusSubscription + // Job ID ticker + lastJobID uint64 + // In flight jobs (for WriteStateDiffAt) + currentJobs map[uint64]JobID + currentJobsMutex sync.Mutex +} + +// IDs used for tracking in-progress jobs (0 for invalid) +type JobID uint64 + +// JobStatus represents the status of a completed job +type JobStatus struct { + id JobID + err error +} + +type statusSubscription struct { + statusChan chan<- JobStatus + quitChan chan<- bool } // BlockCache caches the last block for safe access from different service loops @@ -748,7 +774,27 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ // WriteStateDiffAt writes a state diff at the specific blockheight directly to the database // This operation cannot be performed back past the point of db pruning; it requires an archival node // for historical data -func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { +func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) JobID { + sds.currentJobsMutex.Lock() + defer sds.currentJobsMutex.Unlock() + if id, has := sds.currentJobs[blockNumber]; has { + return id + } + id := JobID(atomic.AddUint64(&sds.lastJobID, 1)) + sds.currentJobs[blockNumber] = id + go func() { + err := sds.writeStateDiffAt(blockNumber, params) + sds.currentJobsMutex.Lock() + delete(sds.currentJobs, blockNumber) + sds.currentJobsMutex.Unlock() + for _, sub := range sds.jobStatusSubs { + sub.statusChan <- JobStatus{id, err} + } + }() + return id +} + +func (sds *Service) writeStateDiffAt(blockNumber uint64, params Params) error { log.Info("writing state diff at", "block height", blockNumber) // use watched addresses from statediffing write loop if not provided @@ -852,6 +898,33 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo return err } +// SubscribeWriteStatus is used by the API to subscribe to the job status updates +func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) { + log.Info("Subscribing to job status updates", "subscription id", id) + sds.Lock() + if sds.jobStatusSubs == nil { + sds.jobStatusSubs = map[rpc.ID]statusSubscription{} + } + sds.jobStatusSubs[id] = statusSubscription{ + statusChan: sub, + quitChan: quitChan, + } + sds.Unlock() +} + +// UnsubscribeWriteStatus is used to unsubscribe from job status updates +func (sds *Service) UnsubscribeWriteStatus(id rpc.ID) error { + log.Info("Unsubscribing from job status updates", "subscription id", id) + sds.Lock() + close(sds.jobStatusSubs[id].quitChan) + delete(sds.jobStatusSubs, id) + if len(sds.jobStatusSubs) == 0 { + sds.jobStatusSubs = nil + } + sds.Unlock() + return nil +} + // WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db: // add | remove | set | clear func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error { diff --git a/statediff/test_helpers/mocks/service.go b/statediff/test_helpers/mocks/service.go index ab183fddd..1ecd80ec8 100644 --- a/statediff/test_helpers/mocks/service.go +++ b/statediff/test_helpers/mocks/service.go @@ -189,9 +189,9 @@ func (sds *MockStateDiffService) newPayload(stateObject []byte, block *types.Blo } // WriteStateDiffAt mock method -func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) error { +func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) statediff.JobID { // TODO: something useful here - return nil + return 0 } // WriteStateDiffFor mock method @@ -436,3 +436,14 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a return nil } + +// SubscribeWriteStatus is used by the API to subscribe to the job status updates +func (sds *MockStateDiffService) SubscribeWriteStatus(id rpc.ID, sub chan<- statediff.JobStatus, quitChan chan<- bool) { + // TODO when WriteStateDiff methods are implemented +} + +// UnsubscribeWriteStatus is used to unsubscribe from job status updates +func (sds *MockStateDiffService) UnsubscribeWriteStatus(id rpc.ID) error { + // TODO when WriteStateDiff methods are implemented + return nil +} -- 2.45.2