Rpc statediffing #10
No reviewers
Labels
No Label
bug
critical
duplicate
enhancement
epic
help wanted
in progress
invalid
low priority
question
rebase
v1
v5
wontfix
Copied from Github
Kind/Breaking
Kind/Bug
Kind/Documentation
Kind/Enhancement
Kind/Feature
Kind/Security
Kind/Testing
Priority
Critical
Priority
High
Priority
Low
Priority
Medium
Reviewed
Confirmed
Reviewed
Duplicate
Reviewed
Invalid
Reviewed
Won't Fix
Status
Abandoned
Status
Blocked
Status
Need More Info
No Milestone
No project
No Assignees
2 Participants
Notifications
Due Date
No due date set.
Dependencies
No dependencies set.
Reference: cerc-io/go-ethereum#10
Loading…
Reference in New Issue
Block a user
No description provided.
Delete Branch "rpc_statediffing"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
This is the branch that the seed node needs to work with.
Don't mind Travis...
does
utils.StateDiffWatchedAddresses
need to be in here too?is the idea that the
StateDiffWatchedAddresses.Name
should be a slice of all the addressed to be watched? Has this been implemented yet - I couldn't quite figure out where this watching was being done.could you remind me what this config does? is this where we're determining if the the trie should be pruned?
:%s/then/the
?where is
lastUsed
used?I wonder if we can extract each of these branches in the
if/else
block into it's own method:collectPathAndProofs
,collectOnlyLeaves
, etc.also wondering if each of these
if/else
branches could be pulled out into their own methods, so thatbuildStorageDiffsFromTrie
is a bit easier to read from a high level.Hey @elizabethengelman thank you for the review!! Been rushing to get this demo working so I haven't had time to look over everything here yet. I'm fairly certain I touched on a number of these issues in my latest commits but I will revisit this after the demo.
Sure does, added it!
I've refactored this quite a bit, it's still one method but much more concise. Might still be best to extract into separate methods but I think we would need 4 different methods to cover the different combinations of processing (leafs only, leafs with their proofs and paths, all nodes, all nodes + leaf proof and paths)
Did similar refactoring as above, let me know what you think! Separate methods might still be the best option.
It isn't, neither is the mutex 🙃 got rid of them both.
I don't think it had been implemented yet at the time of this comment, but yes the
StateDiffWatchedAddresses.Name
is used to load this StringSliceFlag which is a slice of addresses to watch e.g../geth --statediff --statediff.streamblock --ws --syncmode "full" --statediff.watchedaddresses 0xfakeaddr1,0xfakeaddr2,0xfakeaddr3
Yeah this is where GC/pruning can be turned completely off- setting to true = archival node.
Overall this is looking really good!
I wonder if it would be helpful to include some additional documentation at some point, because I'm a bit unclear on how one is able to watch the RPC connection to get the statediffs off of it.
super nitpicky, but can we put these in the same order as they're listed in
main
?same here, with regards to the ordering of the config
I wonder if the config field
AllNodes
and the CLI flag nameStateDiffIntermediateNodes
should be more similar, to reduce confusion. I slightly prefer usingIntermediateNodes
here, to be clear that they will be included as opposed to having to decipher whatAllNodes
implies.👍
cool, thanks!
do we want to check these print statements in, in addition to the error logging?
@ -0,0 +182,4 @@
}
// Subscribe is used by the API to subscribe to the StateDiffingService loop
func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) {
For some reason, I've had a tough time creating and keeping a mental model in my head, so I sketched it out. Does this seem about right to you?
@ -0,0 +214,4 @@
}
// Start is used to begin the StateDiffingService
func (sds *Service) Start(*p2p.Server) error {
I haven't looked at the geth startup process in awhile, but when starting services, is there anything that makes sure that the subscribers have called
Subscribe
first, beforeStart
is called?I think this file may be able to be deleted, since
statediff/publisher/publisher.go
was removed.I may be missing something, but I only see this
MockStateDiffService
isstatediff/testhelpers/mocks/service_test.go
which seems to be testing the mock. Is the intention that this mock can also be used to test the API portion or something?is this file testing the Mock service? or the API? I wonder if the file should be renamed?
Yes, definitely!
No, good catch. Will remove these!
@ -0,0 +182,4 @@
}
// Subscribe is used by the API to subscribe to the StateDiffingService loop
func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) {
Yeah this looks awesome! Thanks for putting this together. The only thing I catch that is different is that the call to Start actually happens when the node and its services are first spun up. I think this should probably be changed, since there is no point in performing the state diff processing if nobody is listening for them yet (previously since it was writing to CSVs this start up mode made more sense).
@ -0,0 +214,4 @@
}
// Start is used to begin the StateDiffingService
func (sds *Service) Start(*p2p.Server) error {
Nope, the service currently starts up whether or not there are subscribers. This is a bit of an artifact from the old mode of operation when writing to CSVs, should adjust it to wait for a subscription to start the processing.
This mock is used to test the API and service together, the service_test that uses it has to be moved in here to prevent a disallowed import cycle (a disadvantage to the really flat package organization that geth uses). The service_test in the top level statediff package uses the real service which has been configured with all mocks, the service_test in
statediff/testhelpers/mocks/service_test.go
uses thisMockStateDiffService
which allows us to pass in current and parent blocks by its channels.We are reusing the same patterns as the rest of the services and apis in Geth so we shouldn't need to write tests for everything, so this test might be superfluous (when comparing the test we have to the test coverage in other service packages).
Sorry, see above, this is confusing and sloppy and only organized like this because of a disallowed import cycle if this test is moved up to the
statediff
package into the other service_test file.Hey @elizabethengelman thanks again for the review! I can provide some additional docs. I'm not entirely sure where they are appropriate because the rpc patterns we are using are the same throughout geth and the general documentation for that is here. That doc is severely lacking though.
@ -0,0 +182,4 @@
}
// Subscribe is used by the API to subscribe to the StateDiffingService loop
func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) {
So
Start
is still called at startup, as is normal for these services, but I've adjusted the processing loop so that it only performs the state diff processing if there is at least one subscriber listening.For some reason I can't see the conversation here anymore (it says there are 34 comments)... I'm going to clean this up and reopen it, and prepare a PR to master Geth.
Super small thing, but I wonder if this method could be renamed
rootAllowedToBeDereferenced
. For some reason, that seems to make more sense in my head. Also, I realize that I maybe wrote this method 😆, so I'd be happy to change it myself as well.@ -0,0 +65,4 @@
case packet := <-payloadChannel:
if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil {
log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
I'm not sure exactly how
Notify
could potentially fail, but I wonder if we want to wait to unsubscribe until there are multiple failures? I haven't thought this through all the way yet, and would totally be cool with punting on this question for now, but wanted to bring it up so that we can think on it.If subscribing over
RPC
, I think we need to pass the--rpc
flag when starting geth up, right? Maybe we can include that here too.@ -0,0 +43,4 @@
e.g.
cli, _ := rpc.Dial("ipcPathOrWsURL")
🎉 thanks for including this here, it's super helpful!
@ -0,0 +87,4 @@
}
// Find all the diffed keys
createKeys := sortKeys(creations)
Could you remind me why we need to sort the keys?
is
iOfA
short forindexOfA
?Are these comments just explaining what the different results from
strings.Compare
mean? If so, that's super helpful! I wonder if we could format them differently, or add some additional text to the comments - it took me a minute to realize that these were indeed comments, and not just commented out dead code.@ -0,0 +34,4 @@
"github.com/ethereum/go-ethereum/rpc"
)
const chainEventChanSize = 20000
Where does
20000
come from?@ -0,0 +122,4 @@
currentBlock := chainEvent.Block
parentHash := currentBlock.ParentHash()
var parentBlock *types.Block
if sds.lastBlock != nil && bytes.Equal(sds.lastBlock.Hash().Bytes(), currentBlock.ParentHash().Bytes()) {
i like this validation that the last block is in fact the parent block! are we doing this first, instead of going straight to
GetBlockByHash
to potentially avoid doing another RPC call if possible?isn't this
send
method sending data to the rpc subscriptions as well?%s/reeipt/receipt
@ -0,0 +44,4 @@
// StateDiff is the final output structure from the builder
type StateDiff struct {
BlockNumber *big.Int `json:"blockNumber" gencodec:"required"`
there are some small spacing issues here and with
AccountDiff
andStorageDiff
structsLooks good to me! :shipit: Just a few small comments and questions, but I'd be comfortable with merging this into the
statediffing
branch whenever you're ready.This method isn't being used anymore, and can probably be deleted.
I think this was being used previously to get the contract's address. But from what I remember the
sdb.chainDB.Get
call will only return the address with an archive node, which is partially why we're currently not able to get the address,I believe so! I didn't write this method, or at least I don't remember writing them 🙃
I agree! Looking through this method it definitely needs some cleaning up, I think this is left over from the quorum code we copied over. The comments are explaining the different results from
strings.Compare
, but it still isn't super clear how/why we are doing the things we do.@ -0,0 +34,4 @@
"github.com/ethereum/go-ethereum/rpc"
)
const chainEventChanSize = 20000
I'm getting that number from here and here
@ -0,0 +122,4 @@
currentBlock := chainEvent.Block
parentHash := currentBlock.ParentHash()
var parentBlock *types.Block
if sds.lastBlock != nil && bytes.Equal(sds.lastBlock.Hash().Bytes(), currentBlock.ParentHash().Bytes()) {
Yeah that's the idea! I'm not sure this is really necessary, now that I think about it more we don't do an rpc call to do
GetBlockByHash
since the blockchain is an internal component... but I still think this quick check against the cached block is faster than reaching down and doing a lookup in the blockchain.Yes, thank you! That is poor wording on my part. It sends to rpc subscriptions- whether or not those subscriptions are over websocket or ipc.
@ -0,0 +87,4 @@
}
// Find all the diffed keys
createKeys := sortKeys(creations)
As I understand it we need to sort the keys before doing
findIntersection
on them, so that thefindIntersection
method is comparing equivalent keys (since it is using theiOfA
andiOfB
to iterate through and compare, the keys need to be sorted by index).I seem to be able to get away with
./geth --statediff --statediff.streamblock --ws --syncmode "full"
I think the--rpc
flag turns on the HTTP-RPC server, which we don't use here (we either need to use the IPC-RPC server which is on by default or the WS-RPC server which we turn on with the--ws
flag).@ -0,0 +65,4 @@
case packet := <-payloadChannel:
if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil {
log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
That's a really good point! I will need to think about this more, and take a closer look at the notifier, but my gut tells me you are right and that there are failures we will want to ignore.
Definitely, will update! That makes more sense to me too.
Thank you for bringing this up! I'll get rid of this, and take a closer look to see if we can do something similar with a full node...
Thanks for all the helpful reviews @elizabethengelman!! I'm going to make the changes you mention, and then I'll merge into
statediffing
. After that I'll begin the official PR to geth:master. I will definitely want your input on how to best structure/frame/word that PR- what's the best way to correspond on that (since we won't want to open the PR til we have it ready)? I'll throw together a draft and send it to you over slack/discord? I'm on vacation in the boonies this week so I won't get around to that until at least tomorrow afternoon when I have internet access again.@ -0,0 +65,4 @@
case packet := <-payloadChannel:
if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil {
log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
This makes it sound like we should unsubscribe because the rpc connection is closed when an error occurs. But that's actually only true if the error occurs here, if the error is from here then the connection isn't closed so we could wait for more failures.