review fixes and fixes for issues ran into in integration

This commit is contained in:
Ian Norden 2019-05-17 10:20:28 -05:00 committed by Rob Mulholand
parent 5fa40688af
commit 65e9874da3
5 changed files with 22 additions and 20 deletions

View File

@ -150,7 +150,7 @@ var (
utils.EVMInterpreterFlag, utils.EVMInterpreterFlag,
utils.StateDiffFlag, utils.StateDiffFlag,
utils.StateDiffPathsAndProofs, utils.StateDiffPathsAndProofs,
utils.StateDiffAllNodeTypes, utils.StateDiffIntermediateNodes,
utils.StateDiffStreamBlock, utils.StateDiffStreamBlock,
utils.StateDiffWatchedAddresses, utils.StateDiffWatchedAddresses,
configFileFlag, configFileFlag,

View File

@ -267,7 +267,7 @@ var AppHelpFlagGroups = []flagGroup{
Flags: []cli.Flag{ Flags: []cli.Flag{
utils.StateDiffFlag, utils.StateDiffFlag,
utils.StateDiffPathsAndProofs, utils.StateDiffPathsAndProofs,
utils.StateDiffAllNodeTypes, utils.StateDiffIntermediateNodes,
utils.StateDiffWatchedAddresses, utils.StateDiffWatchedAddresses,
utils.StateDiffStreamBlock, utils.StateDiffStreamBlock,
}, },

View File

@ -765,11 +765,11 @@ var (
} }
StateDiffPathsAndProofs = cli.BoolFlag{ StateDiffPathsAndProofs = cli.BoolFlag{
Name: "statediff.pathsandproofs", Name: "statediff.pathsandproofs",
Usage: "Set to true to generate paths and proof sets for diffed state and storage trie lead nodes", Usage: "Set to true to generate paths and proof sets for diffed state and storage trie leaf nodes",
} }
StateDiffAllNodeTypes = cli.BoolFlag{ StateDiffIntermediateNodes = cli.BoolFlag{
Name: "statediff.allnodes", Name: "statediff.intermediatenodes",
Usage: "Set to true to consider all node types: leaf, branch, and extension; default (false) processes leaf nodes only", Usage: "Set to include intermediate (branch and extension) nodes; default (false) processes leaf nodes only",
} }
StateDiffWatchedAddresses = cli.StringSliceFlag{ StateDiffWatchedAddresses = cli.StringSliceFlag{
Name: "statediff.watchedaddresses", Name: "statediff.watchedaddresses",
@ -777,7 +777,7 @@ var (
} }
StateDiffStreamBlock = cli.BoolFlag{ StateDiffStreamBlock = cli.BoolFlag{
Name: "statediff.streamblock", Name: "statediff.streamblock",
Usage: "Set to true to stream the block data alongside state diff data", Usage: "Set to true to stream the block data alongside state diff data in the same subscription payload",
} }
) )
@ -1642,7 +1642,7 @@ func RegisterStateDiffService(stack *node.Node, ctx *cli.Context) {
config := statediff.Config{ config := statediff.Config{
StreamBlock: ctx.GlobalBool(StateDiffStreamBlock.Name), StreamBlock: ctx.GlobalBool(StateDiffStreamBlock.Name),
PathsAndProofs: ctx.GlobalBool(StateDiffPathsAndProofs.Name), PathsAndProofs: ctx.GlobalBool(StateDiffPathsAndProofs.Name),
AllNodes: ctx.GlobalBool(StateDiffAllNodeTypes.Name), AllNodes: ctx.GlobalBool(StateDiffIntermediateNodes.Name),
WatchedAddresses: ctx.GlobalStringSlice(StateDiffWatchedAddresses.Name), WatchedAddresses: ctx.GlobalStringSlice(StateDiffWatchedAddresses.Name),
} }
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {

View File

@ -44,7 +44,7 @@ func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI {
} }
// Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created // Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created
func (api *PublicStateDiffAPI) Subscribe(ctx context.Context) (*rpc.Subscription, error) { func (api *PublicStateDiffAPI) Subscribe(ctx context.Context, payloadChan chan Payload) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions // ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx) notifier, supported := rpc.NotifierFromContext(ctx)
if !supported { if !supported {
@ -56,19 +56,21 @@ func (api *PublicStateDiffAPI) Subscribe(ctx context.Context) (*rpc.Subscription
go func() { go func() {
// subscribe to events from the state diff service // subscribe to events from the state diff service
payloadChannel := make(chan Payload) payloadChannel := make(chan Payload, 10)
quitChan := make(chan bool) quitChan := make(chan bool)
api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan) api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan)
// loop and await state diff payloads and relay them to the subscriber with the notifier
// loop and await state diff payloads and relay them to the subscriber with then notifier
for { for {
select { select {
case packet := <-payloadChannel: case packet := <-payloadChannel:
if err := notifier.Notify(rpcSub.ID, packet); err != nil { if err := notifier.Notify(rpcSub.ID, packet); err != nil {
log.Error("Failed to send state diff packet", "err", err) log.Error("Failed to send state diff packet", "err", err)
} }
case <-rpcSub.Err(): case err := <-rpcSub.Err():
err := api.sds.Unsubscribe(rpcSub.ID) log.Error("State diff service rpcSub error", err)
println("err")
println(err.Error())
err = api.sds.Unsubscribe(rpcSub.ID)
if err != nil { if err != nil {
log.Error("Failed to unsubscribe from the state diff service", err) log.Error("Failed to unsubscribe from the state diff service", err)
} }

View File

@ -129,11 +129,11 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err) log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err)
} }
case err := <-errCh: case err := <-errCh:
log.Warn("Error from chain event subscription, breaking loop.", "error", err) log.Warn("Error from chain event subscription, breaking loop", "error", err)
sds.close() sds.close()
return return
case <-sds.QuitChan: case <-sds.QuitChan:
log.Info("Quitting the statediff block channel") log.Info("Quitting the statediffing process")
sds.close() sds.close()
return return
} }
@ -214,9 +214,9 @@ func (sds *Service) send(payload Payload) {
for id, sub := range sds.Subscriptions { for id, sub := range sds.Subscriptions {
select { select {
case sub.PayloadChan <- payload: case sub.PayloadChan <- payload:
log.Info("sending state diff payload to subscription %s", id) log.Info(fmt.Sprintf("sending state diff payload to subscription %s", id))
default: default:
log.Info("unable to send payload to subscription %s; channel has no receiver", id) log.Info(fmt.Sprintf("unable to send payload to subscription %s", id))
} }
} }
sds.Unlock() sds.Unlock()
@ -229,9 +229,9 @@ func (sds *Service) close() {
select { select {
case sub.QuitChan <- true: case sub.QuitChan <- true:
delete(sds.Subscriptions, id) delete(sds.Subscriptions, id)
log.Info("closing subscription %s", id) log.Info(fmt.Sprintf("closing subscription %s", id))
default: default:
log.Info("unable to close subscription %s; channel has no receiver", id) log.Info(fmt.Sprintf("unable to close subscription %s; channel has no receiver", id))
} }
} }
sds.Unlock() sds.Unlock()