Merge pull request #14540 from bas-vk/whisper-api

whisperv5: integrate whisper and implement API
This commit is contained in:
Péter Szilágyi 2017-06-26 13:44:35 +03:00 committed by GitHub
commit feb2932706
31 changed files with 1704 additions and 1257 deletions

View File

@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
"github.com/naoina/toml" "github.com/naoina/toml"
) )
@ -42,7 +43,7 @@ var (
Name: "dumpconfig", Name: "dumpconfig",
Usage: "Show configuration values", Usage: "Show configuration values",
ArgsUsage: "", ArgsUsage: "",
Flags: append(nodeFlags, rpcFlags...), Flags: append(append(nodeFlags, rpcFlags...), whisperFlags...),
Category: "MISCELLANEOUS COMMANDS", Category: "MISCELLANEOUS COMMANDS",
Description: `The dumpconfig command shows configuration values.`, Description: `The dumpconfig command shows configuration values.`,
} }
@ -76,6 +77,7 @@ type ethstatsConfig struct {
type gethConfig struct { type gethConfig struct {
Eth eth.Config Eth eth.Config
Shh whisper.Config
Node node.Config Node node.Config
Ethstats ethstatsConfig Ethstats ethstatsConfig
} }
@ -99,8 +101,8 @@ func defaultNodeConfig() node.Config {
cfg := node.DefaultConfig cfg := node.DefaultConfig
cfg.Name = clientIdentifier cfg.Name = clientIdentifier
cfg.Version = params.VersionWithCommit(gitCommit) cfg.Version = params.VersionWithCommit(gitCommit)
cfg.HTTPModules = append(cfg.HTTPModules, "eth") cfg.HTTPModules = append(cfg.HTTPModules, "eth", "shh")
cfg.WSModules = append(cfg.WSModules, "eth") cfg.WSModules = append(cfg.WSModules, "eth", "shh")
cfg.IPCPath = "geth.ipc" cfg.IPCPath = "geth.ipc"
return cfg return cfg
} }
@ -109,6 +111,7 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
// Load defaults. // Load defaults.
cfg := gethConfig{ cfg := gethConfig{
Eth: eth.DefaultConfig, Eth: eth.DefaultConfig,
Shh: whisper.DefaultConfig,
Node: defaultNodeConfig(), Node: defaultNodeConfig(),
} }
@ -130,19 +133,37 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name) cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
} }
utils.SetShhConfig(ctx, stack, &cfg.Shh)
return stack, cfg return stack, cfg
} }
// enableWhisper returns true in case one of the whisper flags is set.
func enableWhisper(ctx *cli.Context) bool {
for _, flag := range whisperFlags {
if ctx.GlobalIsSet(flag.GetName()) {
return true
}
}
return false
}
func makeFullNode(ctx *cli.Context) *node.Node { func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx) stack, cfg := makeConfigNode(ctx)
utils.RegisterEthService(stack, &cfg.Eth) utils.RegisterEthService(stack, &cfg.Eth)
// Whisper must be explicitly enabled, but is auto-enabled in --dev mode. // Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
shhEnabled := ctx.GlobalBool(utils.WhisperEnabledFlag.Name) shhEnabled := enableWhisper(ctx)
shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DevModeFlag.Name) shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DevModeFlag.Name)
if shhEnabled || shhAutoEnabled { if shhEnabled || shhAutoEnabled {
utils.RegisterShhService(stack) if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
}
if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
}
utils.RegisterShhService(stack, &cfg.Shh)
} }
// Add the Ethereum Stats daemon if requested. // Add the Ethereum Stats daemon if requested.

View File

@ -35,7 +35,7 @@ var (
Action: utils.MigrateFlags(localConsole), Action: utils.MigrateFlags(localConsole),
Name: "console", Name: "console",
Usage: "Start an interactive JavaScript environment", Usage: "Start an interactive JavaScript environment",
Flags: append(append(nodeFlags, rpcFlags...), consoleFlags...), Flags: append(append(append(nodeFlags, rpcFlags...), consoleFlags...), whisperFlags...),
Category: "CONSOLE COMMANDS", Category: "CONSOLE COMMANDS",
Description: ` Description: `
The Geth console is an interactive shell for the JavaScript runtime environment The Geth console is an interactive shell for the JavaScript runtime environment

View File

@ -95,7 +95,6 @@ var (
utils.NetrestrictFlag, utils.NetrestrictFlag,
utils.NodeKeyFileFlag, utils.NodeKeyFileFlag,
utils.NodeKeyHexFlag, utils.NodeKeyHexFlag,
utils.WhisperEnabledFlag,
utils.DevModeFlag, utils.DevModeFlag,
utils.TestnetFlag, utils.TestnetFlag,
utils.RinkebyFlag, utils.RinkebyFlag,
@ -125,6 +124,12 @@ var (
utils.IPCDisabledFlag, utils.IPCDisabledFlag,
utils.IPCPathFlag, utils.IPCPathFlag,
} }
whisperFlags = []cli.Flag{
utils.WhisperEnabledFlag,
utils.WhisperMaxMessageSizeFlag,
utils.WhisperMinPOWFlag,
}
) )
func init() { func init() {
@ -161,6 +166,7 @@ func init() {
app.Flags = append(app.Flags, rpcFlags...) app.Flags = append(app.Flags, rpcFlags...)
app.Flags = append(app.Flags, consoleFlags...) app.Flags = append(app.Flags, consoleFlags...)
app.Flags = append(app.Flags, debug.Flags...) app.Flags = append(app.Flags, debug.Flags...)
app.Flags = append(app.Flags, whisperFlags...)
app.Before = func(ctx *cli.Context) error { app.Before = func(ctx *cli.Context) error {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())

View File

@ -187,6 +187,10 @@ var AppHelpFlagGroups = []flagGroup{
utils.NoCompactionFlag, utils.NoCompactionFlag,
}, debug.Flags...), }, debug.Flags...),
}, },
{
Name: "WHISPER (EXPERIMENTAL)",
Flags: whisperFlags,
},
{ {
Name: "DEPRECATED", Name: "DEPRECATED",
Flags: []cli.Flag{ Flags: []cli.Flag{
@ -195,10 +199,7 @@ var AppHelpFlagGroups = []flagGroup{
}, },
}, },
{ {
Name: "EXPERIMENTAL", Name: "MISC",
Flags: []cli.Flag{
utils.WhisperEnabledFlag,
},
}, },
} }

View File

@ -440,11 +440,6 @@ var (
Usage: "Restricts network communication to the given IP networks (CIDR masks)", Usage: "Restricts network communication to the given IP networks (CIDR masks)",
} }
WhisperEnabledFlag = cli.BoolFlag{
Name: "shh",
Usage: "Enable Whisper",
}
// ATM the url is left to the user and deployment to // ATM the url is left to the user and deployment to
JSpathFlag = cli.StringFlag{ JSpathFlag = cli.StringFlag{
Name: "jspath", Name: "jspath",
@ -463,6 +458,20 @@ var (
Usage: "Suggested gas price is the given percentile of a set of recent transaction gas prices", Usage: "Suggested gas price is the given percentile of a set of recent transaction gas prices",
Value: eth.DefaultConfig.GPO.Percentile, Value: eth.DefaultConfig.GPO.Percentile,
} }
WhisperEnabledFlag = cli.BoolFlag{
Name: "shh",
Usage: "Enable Whisper",
}
WhisperMaxMessageSizeFlag = cli.IntFlag{
Name: "shh.maxmessagesize",
Usage: "Max message size accepted",
Value: int(whisper.DefaultMaxMessageSize),
}
WhisperMinPOWFlag = cli.Float64Flag{
Name: "shh.pow",
Usage: "Minimum POW accepted",
Value: whisper.DefaultMinimumPoW,
}
) )
// MakeDataDir retrieves the currently requested data directory, terminating // MakeDataDir retrieves the currently requested data directory, terminating
@ -878,6 +887,16 @@ func checkExclusive(ctx *cli.Context, flags ...cli.Flag) {
} }
} }
// SetShhConfig applies shh-related command line flags to the config.
func SetShhConfig(ctx *cli.Context, stack *node.Node, cfg *whisper.Config) {
if ctx.GlobalIsSet(WhisperMaxMessageSizeFlag.Name) {
cfg.MaxMessageSize = uint32(ctx.GlobalUint(WhisperMaxMessageSizeFlag.Name))
}
if ctx.GlobalIsSet(WhisperMinPOWFlag.Name) {
cfg.MinimumAcceptedPOW = ctx.GlobalFloat64(WhisperMinPOWFlag.Name)
}
}
// SetEthConfig applies eth-related command line flags to the config. // SetEthConfig applies eth-related command line flags to the config.
func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
// Avoid conflicting network flags // Avoid conflicting network flags
@ -983,8 +1002,10 @@ func RegisterEthService(stack *node.Node, cfg *eth.Config) {
} }
// RegisterShhService configures Whisper and adds it to the given node. // RegisterShhService configures Whisper and adds it to the given node.
func RegisterShhService(stack *node.Node) { func RegisterShhService(stack *node.Node, cfg *whisper.Config) {
if err := stack.Register(func(*node.ServiceContext) (node.Service, error) { return whisper.New(), nil }); err != nil { if err := stack.Register(func(n *node.ServiceContext) (node.Service, error) {
return whisper.New(cfg), nil
}); err != nil {
Fatalf("Failed to register the Whisper service: %v", err) Fatalf("Failed to register the Whisper service: %v", err)
} }
} }

View File

@ -87,7 +87,7 @@ var (
argVerbosity = flag.Int("verbosity", int(log.LvlError), "log verbosity level") argVerbosity = flag.Int("verbosity", int(log.LvlError), "log verbosity level")
argTTL = flag.Uint("ttl", 30, "time-to-live for messages in seconds") argTTL = flag.Uint("ttl", 30, "time-to-live for messages in seconds")
argWorkTime = flag.Uint("work", 5, "work time in seconds") argWorkTime = flag.Uint("work", 5, "work time in seconds")
argMaxSize = flag.Int("maxsize", whisper.DefaultMaxMessageLength, "max size of message") argMaxSize = flag.Uint("maxsize", uint(whisper.DefaultMaxMessageSize), "max size of message")
argPoW = flag.Float64("pow", whisper.DefaultMinimumPoW, "PoW for normal messages in float format (e.g. 2.7)") argPoW = flag.Float64("pow", whisper.DefaultMinimumPoW, "PoW for normal messages in float format (e.g. 2.7)")
argServerPoW = flag.Float64("mspow", whisper.DefaultMinimumPoW, "PoW requirement for Mail Server request") argServerPoW = flag.Float64("mspow", whisper.DefaultMinimumPoW, "PoW requirement for Mail Server request")
@ -198,6 +198,11 @@ func initialize() {
peers = append(peers, peer) peers = append(peers, peer)
} }
cfg := &whisper.Config{
MaxMessageSize: uint32(*argMaxSize),
MinimumAcceptedPOW: *argPoW,
}
if *mailServerMode { if *mailServerMode {
if len(msPassword) == 0 { if len(msPassword) == 0 {
msPassword, err = console.Stdin.PromptPassword("Please enter the Mail Server password: ") msPassword, err = console.Stdin.PromptPassword("Please enter the Mail Server password: ")
@ -205,11 +210,12 @@ func initialize() {
utils.Fatalf("Failed to read Mail Server password: %s", err) utils.Fatalf("Failed to read Mail Server password: %s", err)
} }
} }
shh = whisper.New()
shh = whisper.New(cfg)
shh.RegisterServer(&mailServer) shh.RegisterServer(&mailServer)
mailServer.Init(shh, *argDBPath, msPassword, *argServerPoW) mailServer.Init(shh, *argDBPath, msPassword, *argServerPoW)
} else { } else {
shh = whisper.New() shh = whisper.New(cfg)
} }
if *argPoW != whisper.DefaultMinimumPoW { if *argPoW != whisper.DefaultMinimumPoW {
@ -219,8 +225,8 @@ func initialize() {
} }
} }
if *argMaxSize != whisper.DefaultMaxMessageLength { if uint32(*argMaxSize) != whisper.DefaultMaxMessageSize {
err := shh.SetMaxMessageLength(*argMaxSize) err := shh.SetMaxMessageSize(uint32(*argMaxSize))
if err != nil { if err != nil {
utils.Fatalf("Failed to set max message size: %s", err) utils.Fatalf("Failed to set max message size: %s", err)
} }

View File

@ -526,10 +526,6 @@ const Shh_JS = `
web3._extend({ web3._extend({
property: 'shh', property: 'shh',
methods: [ methods: [
new web3._extend.Method({
name: 'info',
call: 'shh_info'
}),
new web3._extend.Method({ new web3._extend.Method({
name: 'setMaxMessageLength', name: 'setMaxMessageLength',
call: 'shh_setMaxMessageLength', call: 'shh_setMaxMessageLength',
@ -541,8 +537,8 @@ web3._extend({
params: 1 params: 1
}), }),
new web3._extend.Method({ new web3._extend.Method({
name: 'allowP2PMessagesFromPeer', name: 'markTrustedPeer',
call: 'shh_allowP2PMessagesFromPeer', call: 'shh_markTrustedPeer',
params: 1 params: 1
}), }),
new web3._extend.Method({ new web3._extend.Method({
@ -570,58 +566,68 @@ web3._extend({
params: 1 params: 1
}), }),
new web3._extend.Method({ new web3._extend.Method({
name: 'generateSymmetricKey', name: 'newSymKey',
call: 'shh_generateSymmetricKey', call: 'shh_newSymKey',
}), }),
new web3._extend.Method({ new web3._extend.Method({
name: 'addSymmetricKeyDirect', name: 'addSymKey',
call: 'shh_addSymmetricKeyDirect', call: 'shh_addSymKey',
params: 1 params: 1
}), }),
new web3._extend.Method({ new web3._extend.Method({
name: 'addSymmetricKeyFromPassword', name: 'generateSymKeyFromPassword',
call: 'shh_addSymmetricKeyFromPassword', call: 'shh_generateSymKeyFromPassword',
params: 1 params: 1
}), }),
new web3._extend.Method({ new web3._extend.Method({
name: 'hasSymmetricKey', name: 'hasSymKey',
call: 'shh_hasSymmetricKey', call: 'shh_hasSymKey',
params: 1 params: 1
}), }),
new web3._extend.Method({ new web3._extend.Method({
name: 'getSymmetricKey', name: 'getSymKey',
call: 'shh_getSymmetricKey', call: 'shh_getSymKey',
params: 1 params: 1
}), }),
new web3._extend.Method({ new web3._extend.Method({
name: 'deleteSymmetricKey', name: 'deleteSymKey',
call: 'shh_deleteSymmetricKey', call: 'shh_deleteSymKey',
params: 1 params: 1
}), }),
new web3._extend.Method({ new web3._extend.Method({
name: 'subscribe', name: 'subscribe',
call: 'shh_subscribe', call: 'shh_subscribe',
params: 1 params: 2
}), }),
new web3._extend.Method({ new web3._extend.Method({
name: 'unsubscribe', name: 'unsubscribe',
call: 'shh_unsubscribe', call: 'shh_unsubscribe',
params: 1 params: 1
}), }),
new web3._extend.Method({
name: 'getNewSubscriptionMessages',
call: 'shh_getNewSubscriptionMessages',
params: 1
}),
new web3._extend.Method({
name: 'getFloatingMessages',
call: 'shh_getFloatingMessages',
params: 1
}),
new web3._extend.Method({ new web3._extend.Method({
name: 'post', name: 'post',
call: 'shh_post', call: 'shh_post',
params: 1 params: 1
}),
new web3._extend.Method({
name: 'publicKey',
call: 'shh_getPublicKey',
params: 1
}),
new web3._extend.Method({
name: 'getFilterMessages',
call: 'shh_getFilterMessages',
params: 1
}),
new web3._extend.Method({
name: 'deleteMessageFilter',
call: 'shh_deleteMessageFilter',
params: 1
}),
new web3._extend.Method({
name: 'newMessageFilter',
call: 'shh_newMessageFilter',
params: 1
}) })
], ],
properties: properties:
@ -630,7 +636,11 @@ web3._extend({
name: 'version', name: 'version',
getter: 'shh_version', getter: 'shh_version',
outputFormatter: web3._extend.utils.toDecimal outputFormatter: web3._extend.utils.toDecimal
}) }),
new web3._extend.Property({
name: 'info',
getter: 'shh_info'
}),
] ]
}); });
` `

View File

@ -169,7 +169,9 @@ func NewNode(datadir string, config *NodeConfig) (stack *Node, _ error) {
} }
// Register the Whisper protocol if requested // Register the Whisper protocol if requested
if config.WhisperEnabled { if config.WhisperEnabled {
if err := rawStack.Register(func(*node.ServiceContext) (node.Service, error) { return whisper.New(), nil }); err != nil { if err := rawStack.Register(func(*node.ServiceContext) (node.Service, error) {
return whisper.New(&whisper.DefaultConfig), nil
}); err != nil {
return nil, fmt.Errorf("whisper init: %v", err) return nil, fmt.Errorf("whisper init: %v", err)
} }
} }

View File

@ -349,6 +349,52 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
return err return err
} }
// ShhSubscribe calls the "shh_subscribe" method with the given arguments,
// registering a subscription. Server notifications for the subscription are
// sent to the given channel. The element type of the channel must match the
// expected type of content returned by the subscription.
//
// The context argument cancels the RPC request that sets up the subscription but has no
// effect on the subscription after ShhSubscribe has returned.
//
// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
// before considering the subscriber dead. The subscription Err channel will receive
// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
panic("first argument to ShhSubscribe must be a writable channel")
}
if chanVal.IsNil() {
panic("channel given to ShhSubscribe must not be nil")
}
if c.isHTTP {
return nil, ErrNotificationsUnsupported
}
msg, err := c.newMessage("shh"+subscribeMethodSuffix, args...)
if err != nil {
return nil, err
}
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage),
sub: newClientSubscription(c, "shh", chanVal),
}
// Send the subscription request.
// The arrival and validity of the response is signaled on sub.quit.
if err := c.send(ctx, op, msg); err != nil {
return nil, err
}
if _, err := op.wait(ctx); err != nil {
return nil, err
}
return op.sub, nil
}
// EthSubscribe calls the "eth_subscribe" method with the given arguments, // EthSubscribe calls the "eth_subscribe" method with the given arguments,
// registering a subscription. Server notifications for the subscription are // registering a subscription. Server notifications for the subscription are
// sent to the given channel. The element type of the channel must match the // sent to the given channel. The element type of the channel must match the

27
vendor/golang.org/x/sync/LICENSE generated vendored Normal file
View File

@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

22
vendor/golang.org/x/sync/PATENTS generated vendored Normal file
View File

@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

372
vendor/golang.org/x/sync/syncmap/map.go generated vendored Normal file
View File

@ -0,0 +1,372 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package syncmap provides a concurrent map implementation.
// It is a prototype for a proposed addition to the sync package
// in the standard library.
// (https://golang.org/issue/18177)
package syncmap
import (
"sync"
"sync/atomic"
"unsafe"
)
// Map is a concurrent map with amortized-constant-time loads, stores, and deletes.
// It is safe for multiple goroutines to call a Map's methods concurrently.
//
// The zero Map is valid and empty.
//
// A Map must not be copied after first use.
type Map struct {
mu sync.Mutex
// read contains the portion of the map's contents that are safe for
// concurrent access (with or without mu held).
//
// The read field itself is always safe to load, but must only be stored with
// mu held.
//
// Entries stored in read may be updated concurrently without mu, but updating
// a previously-expunged entry requires that the entry be copied to the dirty
// map and unexpunged with mu held.
read atomic.Value // readOnly
// dirty contains the portion of the map's contents that require mu to be
// held. To ensure that the dirty map can be promoted to the read map quickly,
// it also includes all of the non-expunged entries in the read map.
//
// Expunged entries are not stored in the dirty map. An expunged entry in the
// clean map must be unexpunged and added to the dirty map before a new value
// can be stored to it.
//
// If the dirty map is nil, the next write to the map will initialize it by
// making a shallow copy of the clean map, omitting stale entries.
dirty map[interface{}]*entry
// misses counts the number of loads since the read map was last updated that
// needed to lock mu to determine whether the key was present.
//
// Once enough misses have occurred to cover the cost of copying the dirty
// map, the dirty map will be promoted to the read map (in the unamended
// state) and the next store to the map will make a new dirty copy.
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
amended bool // true if the dirty map contains some key not in m.
}
// expunged is an arbitrary pointer that marks entries which have been deleted
// from the dirty map.
var expunged = unsafe.Pointer(new(interface{}))
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
// p points to the interface{} value stored for the entry.
//
// If p == nil, the entry has been deleted and m.dirty == nil.
//
// If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
// is missing from m.dirty.
//
// Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
// != nil, in m.dirty[key].
//
// An entry can be deleted by atomic replacement with nil: when m.dirty is
// next created, it will atomically replace nil with expunged and leave
// m.dirty[key] unset.
//
// An entry's associated value can be updated by atomic replacement, provided
// p != expunged. If p == expunged, an entry's associated value can be updated
// only after first setting m.dirty[key] = e so that lookups using the dirty
// map find the entry.
p unsafe.Pointer // *interface{}
}
func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// tryStore stores a value if the entry has not been expunged.
//
// If the entry is expunged, tryStore returns false and leaves the entry
// unchanged.
func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
for {
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}
// unexpungeLocked ensures that the entry is not marked as expunged.
//
// If the entry was previously expunged, it must be added to the dirty map
// before m.mu is unlocked.
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
// storeLocked unconditionally stores a value to the entry.
//
// The entry must be known not to be expunged.
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
// LoadOrStore returns the existing value for the key if present.
// Otherwise, it stores and returns the given value.
// The loaded result is true if the value was loaded, false if stored.
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// Avoid locking if it's a clean hit.
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
// tryLoadOrStore atomically loads or stores a value if the entry is not
// expunged.
//
// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and
// returns with ok==false.
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
// Copy the interface after the first load to make this method more amenable
// to escape analysis: if we hit the "load" path or the entry is expunged, we
// shouldn't bother heap-allocating.
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
//
// Range does not necessarily correspond to any consistent snapshot of the Map's
// contents: no key will be visited more than once, but if the value for any key
// is stored or deleted concurrently, Range may reflect any mapping for that key
// from any point during the Range call.
//
// Range may be O(N) with the number of elements in the map even if f returns
// false after a constant number of calls.
func (m *Map) Range(f func(key, value interface{}) bool) {
// We need to be able to iterate over all of the keys that were already
// present at the start of the call to Range.
// If read.amended is false, then read.m satisfies that property without
// requiring us to hold m.mu for a long time.
read, _ := m.read.Load().(readOnly)
if read.amended {
// m.dirty contains keys not in read.m. Fortunately, Range is already O(N)
// (assuming the caller does not break out early), so a call to Range
// amortizes an entire copy of the map: we can promote the dirty copy
// immediately!
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}

6
vendor/vendor.json vendored
View File

@ -507,6 +507,12 @@
"revision": "b4690f45fa1cafc47b1c280c2e75116efe40cc13", "revision": "b4690f45fa1cafc47b1c280c2e75116efe40cc13",
"revisionTime": "2017-02-15T08:41:58Z" "revisionTime": "2017-02-15T08:41:58Z"
}, },
{
"checksumSHA1": "4TEYFKrAUuwBMqExjQBsnf/CgjQ=",
"path": "golang.org/x/sync/syncmap",
"revision": "f52d1811a62927559de87708c8913c1650ce4f26",
"revisionTime": "2017-05-17T20:25:26Z"
},
{ {
"checksumSHA1": "rTPzsn0jeqfgnQR0OsMKR8JRy5Y=", "checksumSHA1": "rTPzsn0jeqfgnQR0OsMKR8JRy5Y=",
"path": "golang.org/x/sys/unix", "path": "golang.org/x/sys/unix",

View File

@ -88,7 +88,7 @@ func TestMailServer(t *testing.T) {
} }
var server WMailServer var server WMailServer
shh = whisper.New() shh = whisper.New(&whisper.DefaultConfig)
shh.RegisterServer(&server) shh.RegisterServer(&server)
server.Init(shh, dir, password, powRequirement) server.Init(shh, dir, password, powRequirement)

194
whisper/shhclient/client.go Normal file
View File

@ -0,0 +1,194 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package shhclient
import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
)
// Client defines typed wrappers for the Whisper v5 RPC API.
type Client struct {
c *rpc.Client
}
// Dial connects a client to the given URL.
func Dial(rawurl string) (*Client, error) {
c, err := rpc.Dial(rawurl)
if err != nil {
return nil, err
}
return NewClient(c), nil
}
// NewClient creates a client that uses the given RPC client.
func NewClient(c *rpc.Client) *Client {
return &Client{c}
}
// Version returns the Whisper sub-protocol version.
func (sc *Client) Version(ctx context.Context) (uint, error) {
var result uint
err := sc.c.CallContext(ctx, &result, "shh_version")
return result, err
}
// Info returns diagnostic information about the whisper node.
func (sc *Client) Info(ctx context.Context) (whisper.Info, error) {
var info whisper.Info
err := sc.c.CallContext(ctx, &info, "shh_info")
return info, err
}
// SetMaxMessageSize sets the maximal message size allowed by this node. Incoming
// and outgoing messages with a larger size will be rejected. Whisper message size
// can never exceed the limit imposed by the underlying P2P protocol (10 Mb).
func (sc *Client) SetMaxMessageSize(ctx context.Context, size uint32) error {
var ignored bool
return sc.c.CallContext(ctx, &ignored, "shh_setMaxMessageSize", size)
}
// SetMinimumPoW (experimental) sets the minimal PoW required by this node.
// This experimental function was introduced for the future dynamic adjustment of
// PoW requirement. If the node is overwhelmed with messages, it should raise the
// PoW requirement and notify the peers. The new value should be set relative to
// the old value (e.g. double). The old value could be obtained via shh_info call.
func (sc *Client) SetMinimumPoW(ctx context.Context, pow float64) error {
var ignored bool
return sc.c.CallContext(ctx, &ignored, "shh_setMinPoW", pow)
}
// Marks specific peer trusted, which will allow it to send historic (expired) messages.
// Note This function is not adding new nodes, the node needs to exists as a peer.
func (sc *Client) MarkTrustedPeer(ctx context.Context, enode string) error {
var ignored bool
return sc.c.CallContext(ctx, &ignored, "shh_markTrustedPeer", enode)
}
// NewKeyPair generates a new public and private key pair for message decryption and encryption.
// It returns an identifier that can be used to refer to the key.
func (sc *Client) NewKeyPair(ctx context.Context) (string, error) {
var id string
return id, sc.c.CallContext(ctx, &id, "shh_newKeyPair")
}
// AddPrivateKey stored the key pair, and returns its ID.
func (sc *Client) AddPrivateKey(ctx context.Context, key []byte) (string, error) {
var id string
return id, sc.c.CallContext(ctx, &id, "shh_addPrivateKey", hexutil.Bytes(key))
}
// DeleteKeyPair delete the specifies key.
func (sc *Client) DeleteKeyPair(ctx context.Context, id string) (string, error) {
var ignored bool
return id, sc.c.CallContext(ctx, &ignored, "shh_deleteKeyPair", id)
}
// HasKeyPair returns an indication if the node has a private key or
// key pair matching the given ID.
func (sc *Client) HasKeyPair(ctx context.Context, id string) (bool, error) {
var has bool
return has, sc.c.CallContext(ctx, &has, "shh_hasKeyPair", id)
}
// PublicKey return the public key for a key ID.
func (sc *Client) PublicKey(ctx context.Context, id string) ([]byte, error) {
var key hexutil.Bytes
return []byte(key), sc.c.CallContext(ctx, &key, "shh_getPublicKey", id)
}
// PrivateKey return the private key for a key ID.
func (sc *Client) PrivateKey(ctx context.Context, id string) ([]byte, error) {
var key hexutil.Bytes
return []byte(key), sc.c.CallContext(ctx, &key, "shh_getPrivateKey", id)
}
// NewSymmetricKey generates a random symmetric key and returns its identifier.
// Can be used encrypting and decrypting messages where the key is known to both parties.
func (sc *Client) NewSymmetricKey(ctx context.Context) (string, error) {
var id string
return id, sc.c.CallContext(ctx, &id, "shh_newSymKey")
}
// AddSymmetricKey stores the key, and returns its identifier.
func (sc *Client) AddSymmetricKey(ctx context.Context, key []byte) (string, error) {
var id string
return id, sc.c.CallContext(ctx, &id, "shh_addSymKey", hexutil.Bytes(key))
}
// GenerateSymmetricKeyFromPassword generates the key from password, stores it, and returns its identifier.
func (sc *Client) GenerateSymmetricKeyFromPassword(ctx context.Context, passwd []byte) (string, error) {
var id string
return id, sc.c.CallContext(ctx, &id, "shh_generateSymKeyFromPassword", hexutil.Bytes(passwd))
}
// HasSymmetricKey returns an indication if the key associated with the given id is stored in the node.
func (sc *Client) HasSymmetricKey(ctx context.Context, id string) (bool, error) {
var found bool
return found, sc.c.CallContext(ctx, &found, "shh_hasSymKey", id)
}
// GetSymmetricKey returns the symmetric key associated with the given identifier.
func (sc *Client) GetSymmetricKey(ctx context.Context, id string) ([]byte, error) {
var key hexutil.Bytes
return []byte(key), sc.c.CallContext(ctx, &key, "shh_getSymKey", id)
}
// DeleteSymmetricKey deletes the symmetric key associated with the given identifier.
func (sc *Client) DeleteSymmetricKey(ctx context.Context, id string) error {
var ignored bool
return sc.c.CallContext(ctx, &ignored, "shh_deleteSymKey", id)
}
// Post a message onto the network.
func (sc *Client) Post(ctx context.Context, message whisper.NewMessage) error {
var ignored bool
return sc.c.CallContext(ctx, &ignored, "shh_post", message)
}
// SubscribeMessages subscribes to messages that match the given criteria. This method
// is only supported on bi-directional connections such as websockets and IPC.
// NewMessageFilter uses polling and is supported over HTTP.
func (ec *Client) SubscribeMessages(ctx context.Context, criteria whisper.Criteria, ch chan<- *whisper.Message) (ethereum.Subscription, error) {
return ec.c.ShhSubscribe(ctx, ch, "messages", criteria)
}
// NewMessageFilter creates a filter within the node. This filter can be used to poll
// for new messages (see FilterMessages) that satisfy the given criteria. A filter can
// timeout when it was polled for in whisper.filterTimeout.
func (ec *Client) NewMessageFilter(ctx context.Context, criteria whisper.Criteria) (string, error) {
var id string
return id, ec.c.CallContext(ctx, &id, "shh_newMessageFilter", criteria)
}
// DeleteMessageFilter removes the filter associated with the given id.
func (ec *Client) DeleteMessageFilter(ctx context.Context, id string) error {
var ignored bool
return ec.c.CallContext(ctx, &ignored, "shh_deleteMessageFilter", id)
}
// FilterMessages retrieves all messages that are received between the last call to
// this function and match the criteria that where given when the filter was created.
func (ec *Client) FilterMessages(ctx context.Context, id string) ([]*whisper.Message, error) {
var messages []*whisper.Message
return messages, ec.c.CallContext(ctx, &messages, "shh_getFilterMessages", id)
}

View File

@ -17,494 +17,575 @@
package whisperv5 package whisperv5
import ( import (
"encoding/json" "context"
"crypto/ecdsa"
"errors" "errors"
"fmt" "fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rpc"
) )
var whisperOfflineErr = errors.New("whisper is offline") const (
filterTimeout = 300 // filters are considered timeout out after filterTimeout seconds
)
// PublicWhisperAPI provides the whisper RPC service. var (
ErrSymAsym = errors.New("specify either a symetric or a asymmetric key")
ErrInvalidSymmetricKey = errors.New("invalid symmetric key")
ErrInvalidPublicKey = errors.New("invalid public key")
ErrInvalidSigningPubKey = errors.New("invalid signing public key")
ErrTooLowPoW = errors.New("message rejected, PoW too low")
ErrNoTopics = errors.New("missing topic(s)")
)
// PublicWhisperAPI provides the whisper RPC service that can be
// use publicly without security implications.
type PublicWhisperAPI struct { type PublicWhisperAPI struct {
whisper *Whisper w *Whisper
mu sync.Mutex
lastUsed map[string]time.Time // keeps track when a filter was polled for the last time.
} }
// NewPublicWhisperAPI create a new RPC whisper service. // NewPublicWhisperAPI create a new RPC whisper service.
func NewPublicWhisperAPI(w *Whisper) *PublicWhisperAPI { func NewPublicWhisperAPI(w *Whisper) *PublicWhisperAPI {
return &PublicWhisperAPI{whisper: w} api := &PublicWhisperAPI{
w: w,
lastUsed: make(map[string]time.Time),
}
go api.run()
return api
} }
// Start starts the Whisper worker threads. // run the api event loop.
func (api *PublicWhisperAPI) Start() error { // this loop deletes filter that have not been used within filterTimeout
if api.whisper == nil { func (api *PublicWhisperAPI) run() {
return whisperOfflineErr timeout := time.NewTicker(2 * time.Minute)
for {
<-timeout.C
api.mu.Lock()
for id, lastUsed := range api.lastUsed {
if time.Since(lastUsed).Seconds() >= filterTimeout {
delete(api.lastUsed, id)
if err := api.w.Unsubscribe(id); err != nil {
log.Error("could not unsubscribe whisper filter", "error", err)
}
log.Debug("delete whisper filter (timeout)", "id", id)
}
}
api.mu.Unlock()
} }
return api.whisper.Start(nil)
} }
// Stop stops the Whisper worker threads. // Version returns the Whisper sub-protocol version.
func (api *PublicWhisperAPI) Stop() error { func (api *PublicWhisperAPI) Version(ctx context.Context) string {
if api.whisper == nil { return ProtocolVersionStr
return whisperOfflineErr
}
return api.whisper.Stop()
} }
// Version returns the Whisper version this node offers. // Info contains diagnostic information.
func (api *PublicWhisperAPI) Version() (hexutil.Uint, error) { type Info struct {
if api.whisper == nil { Memory int `json:"memory"` // Memory size of the floating messages in bytes.
return 0, whisperOfflineErr Messages int `json:"messages"` // Number of floating messages.
} MinPow float64 `json:"minPow"` // Minimal accepted PoW
return hexutil.Uint(api.whisper.Version()), nil MaxMessageSize uint32 `json:"maxMessageSize"` // Maximum accepted message size
} }
// Info returns the Whisper statistics for diagnostics. // Info returns diagnostic information about the whisper node.
func (api *PublicWhisperAPI) Info() (string, error) { func (api *PublicWhisperAPI) Info(ctx context.Context) Info {
if api.whisper == nil { stats := api.w.Stats()
return "", whisperOfflineErr return Info{
Memory: stats.memoryUsed,
Messages: len(api.w.messageQueue) + len(api.w.p2pMsgQueue),
MinPow: api.w.MinPow(),
MaxMessageSize: api.w.MaxMessageSize(),
} }
return api.whisper.Stats(), nil
} }
// SetMaxMessageLength sets the maximal message length allowed by this node // SetMaxMessageSize sets the maximum message size that is accepted.
func (api *PublicWhisperAPI) SetMaxMessageLength(val int) error { // Upper limit is defined in whisperv5.MaxMessageSize.
if api.whisper == nil { func (api *PublicWhisperAPI) SetMaxMessageSize(ctx context.Context, size uint32) (bool, error) {
return whisperOfflineErr return true, api.w.SetMaxMessageSize(size)
}
return api.whisper.SetMaxMessageLength(val)
} }
// SetMinimumPoW sets the minimal PoW required by this node // SetMinPow sets the minimum PoW for a message before it is accepted.
func (api *PublicWhisperAPI) SetMinimumPoW(val float64) error { func (api *PublicWhisperAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) {
if api.whisper == nil { return true, api.w.SetMinimumPoW(pow)
return whisperOfflineErr
}
return api.whisper.SetMinimumPoW(val)
} }
// AllowP2PMessagesFromPeer marks specific peer trusted, which will allow it // MarkTrustedPeer marks a peer trusted. , which will allow it to send historic (expired) messages.
// to send historic (expired) messages. // Note: This function is not adding new nodes, the node needs to exists as a peer.
func (api *PublicWhisperAPI) AllowP2PMessagesFromPeer(enode string) error { func (api *PublicWhisperAPI) MarkTrustedPeer(ctx context.Context, enode string) (bool, error) {
if api.whisper == nil {
return whisperOfflineErr
}
n, err := discover.ParseNode(enode) n, err := discover.ParseNode(enode)
if err != nil { if err != nil {
return errors.New("failed to parse enode of trusted peer: " + err.Error()) return false, err
} }
return api.whisper.AllowP2PMessagesFromPeer(n.ID[:]) return true, api.w.AllowP2PMessagesFromPeer(n.ID[:])
} }
// HasKeyPair checks if the whisper node is configured with the private key // NewKeyPair generates a new public and private key pair for message decryption and encryption.
// of the specified public pair. // It returns an ID that can be used to refer to the keypair.
func (api *PublicWhisperAPI) HasKeyPair(id string) (bool, error) { func (api *PublicWhisperAPI) NewKeyPair(ctx context.Context) (string, error) {
if api.whisper == nil { return api.w.NewKeyPair()
return false, whisperOfflineErr
}
return api.whisper.HasKeyPair(id), nil
} }
// DeleteKeyPair deletes the specifies key if it exists. // AddPrivateKey imports the given private key.
func (api *PublicWhisperAPI) DeleteKeyPair(id string) (bool, error) { func (api *PublicWhisperAPI) AddPrivateKey(ctx context.Context, privateKey hexutil.Bytes) (string, error) {
if api.whisper == nil { key, err := crypto.ToECDSA(privateKey)
return false, whisperOfflineErr
}
return api.whisper.DeleteKeyPair(id), nil
}
// NewKeyPair generates a new cryptographic identity for the client, and injects
// it into the known identities for message decryption.
func (api *PublicWhisperAPI) NewKeyPair() (string, error) {
if api.whisper == nil {
return "", whisperOfflineErr
}
return api.whisper.NewKeyPair()
}
// GetPublicKey returns the public key for identity id
func (api *PublicWhisperAPI) GetPublicKey(id string) (hexutil.Bytes, error) {
if api.whisper == nil {
return nil, whisperOfflineErr
}
key, err := api.whisper.GetPrivateKey(id)
if err != nil { if err != nil {
return nil, err return "", err
}
return api.w.AddKeyPair(key)
}
// DeleteKeyPair removes the key with the given key if it exists.
func (api *PublicWhisperAPI) DeleteKeyPair(ctx context.Context, key string) (bool, error) {
if ok := api.w.DeleteKeyPair(key); ok {
return true, nil
}
return false, fmt.Errorf("key pair %s not found", key)
}
// HasKeyPair returns an indication if the node has a key pair that is associated with the given id.
func (api *PublicWhisperAPI) HasKeyPair(ctx context.Context, id string) bool {
return api.w.HasKeyPair(id)
}
// GetPublicKey returns the public key associated with the given key. The key is the hex
// encoded representation of a key in the form specified in section 4.3.6 of ANSI X9.62.
func (api *PublicWhisperAPI) GetPublicKey(ctx context.Context, id string) (hexutil.Bytes, error) {
key, err := api.w.GetPrivateKey(id)
if err != nil {
return hexutil.Bytes{}, err
} }
return crypto.FromECDSAPub(&key.PublicKey), nil return crypto.FromECDSAPub(&key.PublicKey), nil
} }
// GetPrivateKey returns the private key for identity id // GetPublicKey returns the private key associated with the given key. The key is the hex
func (api *PublicWhisperAPI) GetPrivateKey(id string) (string, error) { // encoded representation of a key in the form specified in section 4.3.6 of ANSI X9.62.
if api.whisper == nil { func (api *PublicWhisperAPI) GetPrivateKey(ctx context.Context, id string) (hexutil.Bytes, error) {
return "", whisperOfflineErr key, err := api.w.GetPrivateKey(id)
}
key, err := api.whisper.GetPrivateKey(id)
if err != nil { if err != nil {
return "", err return hexutil.Bytes{}, err
} }
return common.ToHex(crypto.FromECDSA(key)), nil return crypto.FromECDSA(key), nil
} }
// GenerateSymmetricKey generates a random symmetric key and stores it under id, // NewSymKey generate a random symmetric key.
// which is then returned. Will be used in the future for session key exchange. // It returns an ID that can be used to refer to the key.
func (api *PublicWhisperAPI) GenerateSymmetricKey() (string, error) { // Can be used encrypting and decrypting messages where the key is known to both parties.
if api.whisper == nil { func (api *PublicWhisperAPI) NewSymKey(ctx context.Context) (string, error) {
return "", whisperOfflineErr return api.w.GenerateSymKey()
}
return api.whisper.GenerateSymKey()
} }
// AddSymmetricKeyDirect stores the key, and returns its id. // AddSymKey import a symmetric key.
func (api *PublicWhisperAPI) AddSymmetricKeyDirect(key hexutil.Bytes) (string, error) { // It returns an ID that can be used to refer to the key.
if api.whisper == nil { // Can be used encrypting and decrypting messages where the key is known to both parties.
return "", whisperOfflineErr func (api *PublicWhisperAPI) AddSymKey(ctx context.Context, key hexutil.Bytes) (string, error) {
} return api.w.AddSymKeyDirect([]byte(key))
return api.whisper.AddSymKeyDirect(key)
} }
// AddSymmetricKeyFromPassword generates the key from password, stores it, and returns its id. // GenerateSymKeyFromPassword derive a key from the given password, stores it, and returns its ID.
func (api *PublicWhisperAPI) AddSymmetricKeyFromPassword(password string) (string, error) { func (api *PublicWhisperAPI) GenerateSymKeyFromPassword(ctx context.Context, passwd string) (string, error) {
if api.whisper == nil { return api.w.AddSymKeyFromPassword(passwd)
return "", whisperOfflineErr
}
return api.whisper.AddSymKeyFromPassword(password)
} }
// HasSymmetricKey returns true if there is a key associated with the given id. // HasSymKey returns an indication if the node has a symmetric key associated with the given key.
// Otherwise returns false. func (api *PublicWhisperAPI) HasSymKey(ctx context.Context, id string) bool {
func (api *PublicWhisperAPI) HasSymmetricKey(id string) (bool, error) { return api.w.HasSymKey(id)
if api.whisper == nil {
return false, whisperOfflineErr
}
res := api.whisper.HasSymKey(id)
return res, nil
} }
// GetSymmetricKey returns the symmetric key associated with the given id. // GetSymKey returns the symmetric key associated with the given id.
func (api *PublicWhisperAPI) GetSymmetricKey(name string) (hexutil.Bytes, error) { func (api *PublicWhisperAPI) GetSymKey(ctx context.Context, id string) (hexutil.Bytes, error) {
if api.whisper == nil { return api.w.GetSymKey(id)
return nil, whisperOfflineErr }
// DeleteSymKey deletes the symmetric key that is associated with the given id.
func (api *PublicWhisperAPI) DeleteSymKey(ctx context.Context, id string) bool {
return api.w.DeleteSymKey(id)
}
//go:generate gencodec -type NewMessage -field-override newMessageOverride -out gen_newmessage_json.go
// NewMessage represents a new whisper message that is posted through the RPC.
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey []byte `json:"pubKey"`
Sig string `json:"sig"`
TTL uint32 `json:"ttl"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
}
type newMessageOverride struct {
PublicKey hexutil.Bytes
Payload hexutil.Bytes
Padding hexutil.Bytes
}
// Post a message on the Whisper network.
func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, error) {
var (
symKeyGiven = len(req.SymKeyID) > 0
pubKeyGiven = len(req.PublicKey) > 0
err error
)
// user must specify either a symmetric or a asymmetric key
if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) {
return false, ErrSymAsym
} }
b, err := api.whisper.GetSymKey(name)
params := &MessageParams{
TTL: req.TTL,
Payload: req.Payload,
Padding: req.Padding,
WorkTime: req.PowTime,
PoW: req.PowTarget,
Topic: req.Topic,
}
// Set key that is used to sign the message
if len(req.Sig) > 0 {
if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil {
return false, err
}
}
// Set symmetric key that is used to encrypt the message
if symKeyGiven {
if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption
return false, ErrNoTopics
}
if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
return false, err
}
if !validateSymmetricKey(params.KeySym) {
return false, ErrInvalidSymmetricKey
}
}
// Set asymmetric key that is used to encrypt the message
if pubKeyGiven {
params.Dst = crypto.ToECDSAPub(req.PublicKey)
if !ValidatePublicKey(params.Dst) {
return false, ErrInvalidPublicKey
}
}
// encrypt and sent message
whisperMsg, err := NewSentMessage(params)
if err != nil { if err != nil {
return nil, err return false, err
} }
return b, nil
env, err := whisperMsg.Wrap(params)
if err != nil {
return false, err
}
// send to specific node (skip PoW check)
if len(req.TargetPeer) > 0 {
n, err := discover.ParseNode(req.TargetPeer)
if err != nil {
return false, fmt.Errorf("failed to parse target peer: %s", err)
}
return true, api.w.SendP2PMessage(n.ID[:], env)
}
// ensure that the message PoW meets the node's minimum accepted PoW
if req.PowTarget < api.w.MinPow() {
return false, ErrTooLowPoW
}
return true, api.w.Send(env)
} }
// DeleteSymmetricKey deletes the key associated with the name string if it exists. //go:generate gencodec -type Criteria -field-override criteriaOverride -out gen_criteria_json.go
func (api *PublicWhisperAPI) DeleteSymmetricKey(name string) (bool, error) {
if api.whisper == nil { // Criteria holds various filter options for inbound messages.
return false, whisperOfflineErr type Criteria struct {
} SymKeyID string `json:"symKeyID"`
res := api.whisper.DeleteSymKey(name) PrivateKeyID string `json:"privateKeyID"`
return res, nil Sig []byte `json:"sig"`
MinPow float64 `json:"minPow"`
Topics []TopicType `json:"topics"`
AllowP2P bool `json:"allowP2P"`
} }
// Subscribe creates and registers a new filter to watch for inbound whisper messages. type criteriaOverride struct {
// Returns the ID of the newly created filter. Sig hexutil.Bytes
func (api *PublicWhisperAPI) Subscribe(args WhisperFilterArgs) (string, error) { }
if api.whisper == nil {
return "", whisperOfflineErr // Messages set up a subscription that fires events when messages arrive that match
// the given set of criteria.
func (api *PublicWhisperAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Subscription, error) {
var (
symKeyGiven = len(crit.SymKeyID) > 0
pubKeyGiven = len(crit.PrivateKeyID) > 0
err error
)
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
// user must specify either a symmetric or a asymmetric key
if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) {
return nil, ErrSymAsym
} }
filter := Filter{ filter := Filter{
PoW: args.MinPoW, PoW: crit.MinPow,
Messages: make(map[common.Hash]*ReceivedMessage), Messages: make(map[common.Hash]*ReceivedMessage),
AllowP2P: args.AllowP2P, AllowP2P: crit.AllowP2P,
} }
var err error if len(crit.Sig) > 0 {
for i, bt := range args.Topics { filter.Src = crypto.ToECDSAPub(crit.Sig)
if len(bt) == 0 || len(bt) > 4 {
return "", errors.New(fmt.Sprintf("subscribe: topic %d has wrong size: %d", i, len(bt)))
}
filter.Topics = append(filter.Topics, bt)
}
if err = ValidateKeyID(args.Key); err != nil {
return "", errors.New("subscribe: " + err.Error())
}
if len(args.Sig) > 0 {
sb := common.FromHex(args.Sig)
if sb == nil {
return "", errors.New("subscribe: sig parameter is invalid")
}
filter.Src = crypto.ToECDSAPub(sb)
if !ValidatePublicKey(filter.Src) { if !ValidatePublicKey(filter.Src) {
return "", errors.New("subscribe: invalid 'sig' field") return nil, ErrInvalidSigningPubKey
} }
} }
if args.Symmetric { for i, bt := range crit.Topics {
if len(args.Topics) == 0 { if len(bt) == 0 || len(bt) > 4 {
return "", errors.New("subscribe: at least one topic must be specified with symmetric encryption") return nil, fmt.Errorf("subscribe: topic %d has wrong size: %d", i, len(bt))
} }
symKey, err := api.whisper.GetSymKey(args.Key) filter.Topics = append(filter.Topics, bt[:])
}
// listen for message that are encrypted with the given symmetric key
if symKeyGiven {
if len(filter.Topics) == 0 {
return nil, ErrNoTopics
}
key, err := api.w.GetSymKey(crit.SymKeyID)
if err != nil { if err != nil {
return "", errors.New("subscribe: invalid key ID") return nil, err
} }
if !validateSymmetricKey(symKey) { if !validateSymmetricKey(key) {
return "", errors.New("subscribe: retrieved key is invalid") return nil, ErrInvalidSymmetricKey
} }
filter.KeySym = symKey filter.KeySym = key
filter.SymKeyHash = crypto.Keccak256Hash(filter.KeySym) filter.SymKeyHash = crypto.Keccak256Hash(filter.KeySym)
} else { }
filter.KeyAsym, err = api.whisper.GetPrivateKey(args.Key)
if err != nil { // listen for messages that are encrypted with the given public key
return "", errors.New("subscribe: invalid key ID") if pubKeyGiven {
} filter.KeyAsym, err = api.w.GetPrivateKey(crit.PrivateKeyID)
if filter.KeyAsym == nil { if err != nil || filter.KeyAsym == nil {
return "", errors.New("subscribe: non-existent identity provided") return nil, ErrInvalidPublicKey
} }
} }
return api.whisper.Subscribe(&filter) id, err := api.w.Subscribe(&filter)
} if err != nil {
return nil, err
// Unsubscribe disables and removes an existing filter.
func (api *PublicWhisperAPI) Unsubscribe(id string) {
api.whisper.Unsubscribe(id)
}
// GetSubscriptionMessages retrieves all the new messages matched by the corresponding
// subscription filter since the last retrieval.
func (api *PublicWhisperAPI) GetNewSubscriptionMessages(id string) []*WhisperMessage {
f := api.whisper.GetFilter(id)
if f != nil {
newMail := f.Retrieve()
return toWhisperMessages(newMail)
} }
return toWhisperMessages(nil)
// create subscription and start waiting for message events
rpcSub := notifier.CreateSubscription()
go func() {
// for now poll internally, refactor whisper internal for channel support
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if filter := api.w.GetFilter(id); filter != nil {
for _, rpcMessage := range toMessage(filter.Retrieve()) {
if err := notifier.Notify(rpcSub.ID, rpcMessage); err != nil {
log.Error("Failed to send notification", "err", err)
}
}
}
case <-rpcSub.Err():
api.w.Unsubscribe(id)
return
case <-notifier.Closed():
api.w.Unsubscribe(id)
return
}
}
}()
return rpcSub, nil
} }
// GetMessages retrieves all the floating messages that match a specific subscription filter. //go:generate gencodec -type Message -field-override messageOverride -out gen_message_json.go
// It is likely to be called once per session, right after Subscribe call.
func (api *PublicWhisperAPI) GetFloatingMessages(id string) []*WhisperMessage { // Message is the RPC representation of a whisper message.
all := api.whisper.Messages(id) type Message struct {
return toWhisperMessages(all) Sig []byte `json:"sig,omitempty"`
TTL uint32 `json:"ttl"`
Timestamp uint32 `json:"timestamp"`
Topic TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PoW float64 `json:"pow"`
Hash []byte `json:"hash"`
Dst []byte `json:"recipientPublicKey,omitempty"`
} }
// toWhisperMessages converts a Whisper message to a RPC whisper message. type messageOverride struct {
func toWhisperMessages(messages []*ReceivedMessage) []*WhisperMessage { Sig hexutil.Bytes
msgs := make([]*WhisperMessage, len(messages)) Payload hexutil.Bytes
Padding hexutil.Bytes
Hash hexutil.Bytes
Dst hexutil.Bytes
}
// ToWhisperMessage converts an internal message into an API version.
func ToWhisperMessage(message *ReceivedMessage) *Message {
msg := Message{
Payload: message.Payload,
Padding: message.Padding,
Timestamp: message.Sent,
TTL: message.TTL,
PoW: message.PoW,
Hash: message.EnvelopeHash.Bytes(),
Topic: message.Topic,
}
if message.Dst != nil {
b := crypto.FromECDSAPub(message.Dst)
if b != nil {
msg.Dst = b
}
}
if isMessageSigned(message.Raw[0]) {
b := crypto.FromECDSAPub(message.SigToPubKey())
if b != nil {
msg.Sig = b
}
}
return &msg
}
// toMessage converts a set of messages to its RPC representation.
func toMessage(messages []*ReceivedMessage) []*Message {
msgs := make([]*Message, len(messages))
for i, msg := range messages { for i, msg := range messages {
msgs[i] = NewWhisperMessage(msg) msgs[i] = ToWhisperMessage(msg)
} }
return msgs return msgs
} }
// Post creates a whisper message and injects it into the network for distribution. // GetFilterMessages returns the messages that match the filter criteria and
func (api *PublicWhisperAPI) Post(args PostArgs) error { // are received between the last poll and now.
if api.whisper == nil { func (api *PublicWhisperAPI) GetFilterMessages(id string) ([]*Message, error) {
return whisperOfflineErr api.mu.Lock()
f := api.w.GetFilter(id)
if f == nil {
api.mu.Unlock()
return nil, fmt.Errorf("filter not found")
}
api.lastUsed[id] = time.Now()
api.mu.Unlock()
receivedMessages := f.Retrieve()
messages := make([]*Message, 0, len(receivedMessages))
for _, msg := range receivedMessages {
messages = append(messages, ToWhisperMessage(msg))
} }
var err error return messages, nil
params := MessageParams{ }
TTL: args.TTL,
WorkTime: args.PowTime, // DeleteMessageFilter deletes a filter.
PoW: args.PowTarget, func (api *PublicWhisperAPI) DeleteMessageFilter(id string) (bool, error) {
Payload: args.Payload, api.mu.Lock()
Padding: args.Padding, defer api.mu.Unlock()
delete(api.lastUsed, id)
return true, api.w.Unsubscribe(id)
}
// NewMessageFilter creates a new filter that can be used to poll for
// (new) messages that satisfy the given criteria.
func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) {
var (
src *ecdsa.PublicKey
keySym []byte
keyAsym *ecdsa.PrivateKey
topics [][]byte
symKeyGiven = len(req.SymKeyID) > 0
asymKeyGiven = len(req.PrivateKeyID) > 0
err error
)
// user must specify either a symmetric or a asymmetric key
if (symKeyGiven && asymKeyGiven) || (!symKeyGiven && !asymKeyGiven) {
return "", ErrSymAsym
} }
if len(args.Key) == 0 { if len(req.Sig) > 0 {
return errors.New("post: key is missing") src = crypto.ToECDSAPub(req.Sig)
} if !ValidatePublicKey(src) {
return "", ErrInvalidSigningPubKey
if len(args.Sig) > 0 {
params.Src, err = api.whisper.GetPrivateKey(args.Sig)
if err != nil {
return err
}
if params.Src == nil {
return errors.New("post: empty identity")
} }
} }
if len(args.Topic) == TopicLength { if symKeyGiven {
params.Topic = BytesToTopic(args.Topic) if keySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
} else if len(args.Topic) != 0 { return "", err
return errors.New(fmt.Sprintf("post: wrong topic size %d", len(args.Topic))) }
if !validateSymmetricKey(keySym) {
return "", ErrInvalidSymmetricKey
}
} }
if args.Type == "sym" { if asymKeyGiven {
if err = ValidateKeyID(args.Key); err != nil { if keyAsym, err = api.w.GetPrivateKey(req.PrivateKeyID); err != nil {
return err return "", err
} }
params.KeySym, err = api.whisper.GetSymKey(args.Key)
if err != nil {
return err
}
if !validateSymmetricKey(params.KeySym) {
return errors.New("post: key for symmetric encryption is invalid")
}
if len(params.Topic) == 0 {
return errors.New("post: topic is missing for symmetric encryption")
}
} else if args.Type == "asym" {
kb := common.FromHex(args.Key)
if kb == nil {
return errors.New("post: public key for asymmetric encryption is invalid")
}
params.Dst = crypto.ToECDSAPub(kb)
if !ValidatePublicKey(params.Dst) {
return errors.New("post: public key for asymmetric encryption is invalid")
}
} else {
return errors.New("post: wrong type (sym/asym)")
} }
// encrypt and send if len(req.Topics) > 0 {
message, err := NewSentMessage(&params) topics = make([][]byte, 1)
for _, topic := range req.Topics {
topics = append(topics, topic[:])
}
}
f := &Filter{
Src: src,
KeySym: keySym,
KeyAsym: keyAsym,
PoW: req.MinPow,
AllowP2P: req.AllowP2P,
Topics: topics,
Messages: make(map[common.Hash]*ReceivedMessage),
}
id, err := api.w.Subscribe(f)
if err != nil { if err != nil {
return err return "", err
}
envelope, err := message.Wrap(&params)
if err != nil {
return err
}
if envelope.size() > api.whisper.maxMsgLength {
return errors.New("post: message is too big")
} }
if len(args.TargetPeer) != 0 { api.mu.Lock()
n, err := discover.ParseNode(args.TargetPeer) api.lastUsed[id] = time.Now()
if err != nil { api.mu.Unlock()
return errors.New("post: failed to parse enode of target peer: " + err.Error())
}
return api.whisper.SendP2PMessage(n.ID[:], envelope)
} else if args.PowTarget < api.whisper.minPoW {
return errors.New("post: target PoW is less than minimum PoW, the message can not be sent")
}
return api.whisper.Send(envelope) return id, nil
}
type PostArgs struct {
Type string `json:"type"` // "sym"/"asym" (symmetric or asymmetric)
TTL uint32 `json:"ttl"` // time-to-live in seconds
Sig string `json:"sig"` // id of the signing key
Key string `json:"key"` // key id (in case of sym) or public key (in case of asym)
Topic hexutil.Bytes `json:"topic"` // topic (4 bytes)
Padding hexutil.Bytes `json:"padding"` // optional padding bytes
Payload hexutil.Bytes `json:"payload"` // payload to be encrypted
PowTime uint32 `json:"powTime"` // maximal time in seconds to be spent on PoW
PowTarget float64 `json:"powTarget"` // minimal PoW required for this message
TargetPeer string `json:"targetPeer"` // peer id (for p2p message only)
}
type WhisperFilterArgs struct {
Symmetric bool // encryption type
Key string // id of the key to be used for decryption
Sig string // public key of the sender to be verified
MinPoW float64 // minimal PoW requirement
Topics [][]byte // list of topics (up to 4 bytes each) to match
AllowP2P bool // indicates wheather direct p2p messages are allowed for this filter
}
// UnmarshalJSON implements the json.Unmarshaler interface, invoked to convert a
// JSON message blob into a WhisperFilterArgs structure.
func (args *WhisperFilterArgs) UnmarshalJSON(b []byte) (err error) {
// Unmarshal the JSON message and sanity check
var obj struct {
Type string `json:"type"`
Key string `json:"key"`
Sig string `json:"sig"`
MinPoW float64 `json:"minPoW"`
Topics []interface{} `json:"topics"`
AllowP2P bool `json:"allowP2P"`
}
if err := json.Unmarshal(b, &obj); err != nil {
return err
}
switch obj.Type {
case "sym":
args.Symmetric = true
case "asym":
args.Symmetric = false
default:
return errors.New("wrong type (sym/asym)")
}
args.Key = obj.Key
args.Sig = obj.Sig
args.MinPoW = obj.MinPoW
args.AllowP2P = obj.AllowP2P
// Construct the topic array
if obj.Topics != nil {
topics := make([]string, len(obj.Topics))
for i, field := range obj.Topics {
switch value := field.(type) {
case string:
topics[i] = value
case nil:
return fmt.Errorf("topic[%d] is empty", i)
default:
return fmt.Errorf("topic[%d] is not a string", i)
}
}
topicsDecoded := make([][]byte, len(topics))
for j, s := range topics {
x := common.FromHex(s)
if x == nil || len(x) > TopicLength {
return fmt.Errorf("topic[%d] is invalid", j)
}
topicsDecoded[j] = x
}
args.Topics = topicsDecoded
}
return nil
}
// WhisperMessage is the RPC representation of a whisper message.
type WhisperMessage struct {
Topic string `json:"topic"`
Payload string `json:"payload"`
Padding string `json:"padding"`
Src string `json:"sig"`
Dst string `json:"recipientPublicKey"`
Timestamp uint32 `json:"timestamp"`
TTL uint32 `json:"ttl"`
PoW float64 `json:"pow"`
Hash string `json:"hash"`
}
// NewWhisperMessage converts an internal message into an API version.
func NewWhisperMessage(message *ReceivedMessage) *WhisperMessage {
msg := WhisperMessage{
Payload: common.ToHex(message.Payload),
Padding: common.ToHex(message.Padding),
Timestamp: message.Sent,
TTL: message.TTL,
PoW: message.PoW,
Hash: common.ToHex(message.EnvelopeHash.Bytes()),
}
if len(message.Topic) == TopicLength {
msg.Topic = common.ToHex(message.Topic[:])
}
if message.Dst != nil {
b := crypto.FromECDSAPub(message.Dst)
if b != nil {
msg.Dst = common.ToHex(b)
}
}
if isMessageSigned(message.Raw[0]) {
b := crypto.FromECDSAPub(message.SigToPubKey())
if b != nil {
msg.Src = common.ToHex(b)
}
}
return &msg
} }

View File

@ -1,680 +0,0 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package whisperv5
import (
"bytes"
"encoding/json"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
)
func TestBasic(t *testing.T) {
var id string = "test"
w := New()
api := NewPublicWhisperAPI(w)
if api == nil {
t.Fatalf("failed to create API.")
}
ver, err := api.Version()
if err != nil {
t.Fatalf("failed generateFilter: %s.", err)
}
if uint64(ver) != ProtocolVersion {
t.Fatalf("wrong version: %d.", ver)
}
mail := api.GetNewSubscriptionMessages("non-existent-id")
if len(mail) != 0 {
t.Fatalf("failed GetFilterChanges: premature result")
}
exist, err := api.HasKeyPair(id)
if err != nil {
t.Fatalf("failed initial HasIdentity: %s.", err)
}
if exist {
t.Fatalf("failed initial HasIdentity: false positive.")
}
success, err := api.DeleteKeyPair(id)
if err != nil {
t.Fatalf("failed DeleteIdentity: %s.", err)
}
if success {
t.Fatalf("deleted non-existing identity: false positive.")
}
pub, err := api.NewKeyPair()
if err != nil {
t.Fatalf("failed NewIdentity: %s.", err)
}
if len(pub) == 0 {
t.Fatalf("failed NewIdentity: empty")
}
exist, err = api.HasKeyPair(pub)
if err != nil {
t.Fatalf("failed HasIdentity: %s.", err)
}
if !exist {
t.Fatalf("failed HasIdentity: false negative.")
}
success, err = api.DeleteKeyPair(pub)
if err != nil {
t.Fatalf("failed to delete second identity: %s.", err)
}
if !success {
t.Fatalf("failed to delete second identity.")
}
exist, err = api.HasKeyPair(pub)
if err != nil {
t.Fatalf("failed HasIdentity(): %s.", err)
}
if exist {
t.Fatalf("failed HasIdentity(): false positive.")
}
id = "arbitrary text"
id2 := "another arbitrary string"
exist, err = api.HasSymmetricKey(id)
if err != nil {
t.Fatalf("failed HasSymKey: %s.", err)
}
if exist {
t.Fatalf("failed HasSymKey: false positive.")
}
id, err = api.GenerateSymmetricKey()
if err != nil {
t.Fatalf("failed GenerateSymKey: %s.", err)
}
exist, err = api.HasSymmetricKey(id)
if err != nil {
t.Fatalf("failed HasSymKey(): %s.", err)
}
if !exist {
t.Fatalf("failed HasSymKey(): false negative.")
}
const password = "some stuff here"
id, err = api.AddSymmetricKeyFromPassword(password)
if err != nil {
t.Fatalf("failed AddSymKey: %s.", err)
}
id2, err = api.AddSymmetricKeyFromPassword(password)
if err != nil {
t.Fatalf("failed AddSymKey: %s.", err)
}
exist, err = api.HasSymmetricKey(id2)
if err != nil {
t.Fatalf("failed HasSymKey(id2): %s.", err)
}
if !exist {
t.Fatalf("failed HasSymKey(id2): false negative.")
}
k1, err := api.GetSymmetricKey(id)
if err != nil {
t.Fatalf("failed GetSymKey(id): %s.", err)
}
k2, err := api.GetSymmetricKey(id2)
if err != nil {
t.Fatalf("failed GetSymKey(id2): %s.", err)
}
if !bytes.Equal(k1, k2) {
t.Fatalf("installed keys are not equal")
}
exist, err = api.DeleteSymmetricKey(id)
if err != nil {
t.Fatalf("failed DeleteSymKey(id): %s.", err)
}
if !exist {
t.Fatalf("failed DeleteSymKey(id): false negative.")
}
exist, err = api.HasSymmetricKey(id)
if err != nil {
t.Fatalf("failed HasSymKey(id): %s.", err)
}
if exist {
t.Fatalf("failed HasSymKey(id): false positive.")
}
}
func TestUnmarshalFilterArgs(t *testing.T) {
s := []byte(`{
"type":"sym",
"key":"0x70c87d191324e6712a591f304b4eedef6ad9bb9d",
"sig":"0x9b2055d370f73ec7d8a03e965129118dc8f5bf83",
"minPoW":2.34,
"topics":["0x00000000", "0x007f80ff", "0xff807f00", "0xf26e7779"],
"allowP2P":true
}`)
var f WhisperFilterArgs
err := f.UnmarshalJSON(s)
if err != nil {
t.Fatalf("failed UnmarshalJSON: %s.", err)
}
if !f.Symmetric {
t.Fatalf("wrong type.")
}
if f.Key != "0x70c87d191324e6712a591f304b4eedef6ad9bb9d" {
t.Fatalf("wrong key: %s.", f.Key)
}
if f.Sig != "0x9b2055d370f73ec7d8a03e965129118dc8f5bf83" {
t.Fatalf("wrong sig: %s.", f.Sig)
}
if f.MinPoW != 2.34 {
t.Fatalf("wrong MinPoW: %f.", f.MinPoW)
}
if !f.AllowP2P {
t.Fatalf("wrong AllowP2P.")
}
if len(f.Topics) != 4 {
t.Fatalf("wrong topics number: %d.", len(f.Topics))
}
i := 0
if !bytes.Equal(f.Topics[i], []byte{0x00, 0x00, 0x00, 0x00}) {
t.Fatalf("wrong topic[%d]: %x.", i, f.Topics[i])
}
i++
if !bytes.Equal(f.Topics[i], []byte{0x00, 0x7f, 0x80, 0xff}) {
t.Fatalf("wrong topic[%d]: %x.", i, f.Topics[i])
}
i++
if !bytes.Equal(f.Topics[i], []byte{0xff, 0x80, 0x7f, 0x00}) {
t.Fatalf("wrong topic[%d]: %x.", i, f.Topics[i])
}
i++
if !bytes.Equal(f.Topics[i], []byte{0xf2, 0x6e, 0x77, 0x79}) {
t.Fatalf("wrong topic[%d]: %x.", i, f.Topics[i])
}
}
func TestUnmarshalPostArgs(t *testing.T) {
s := []byte(`{
"type":"sym",
"ttl":12345,
"sig":"0x70c87d191324e6712a591f304b4eedef6ad9bb9d",
"key":"0x9b2055d370f73ec7d8a03e965129118dc8f5bf83",
"topic":"0xf26e7779",
"padding":"0x74686973206973206D79207465737420737472696E67",
"payload":"0x7061796C6F61642073686F756C642062652070736575646F72616E646F6D",
"powTime":777,
"powTarget":3.1416,
"targetPeer":"enode://915533f667b1369793ebb9bda022416b1295235a1420799cd87a969467372546d808ebf59c5c9ce23f103d59b61b97df8af91f0908552485975397181b993461@127.0.0.1:12345"
}`)
var a PostArgs
err := json.Unmarshal(s, &a)
if err != nil {
t.Fatalf("failed UnmarshalJSON: %s.", err)
}
if a.Type != "sym" {
t.Fatalf("wrong Type: %s.", a.Type)
}
if a.TTL != 12345 {
t.Fatalf("wrong ttl: %d.", a.TTL)
}
if a.Sig != "0x70c87d191324e6712a591f304b4eedef6ad9bb9d" {
t.Fatalf("wrong From: %s.", a.Sig)
}
if a.Key != "0x9b2055d370f73ec7d8a03e965129118dc8f5bf83" {
t.Fatalf("wrong Key: %s.", a.Key)
}
if BytesToTopic(a.Topic) != (TopicType{0xf2, 0x6e, 0x77, 0x79}) {
t.Fatalf("wrong topic: %x.", a.Topic)
}
if string(a.Padding) != "this is my test string" {
t.Fatalf("wrong Padding: %s.", string(a.Padding))
}
if string(a.Payload) != "payload should be pseudorandom" {
t.Fatalf("wrong Payload: %s.", string(a.Payload))
}
if a.PowTime != 777 {
t.Fatalf("wrong PowTime: %d.", a.PowTime)
}
if a.PowTarget != 3.1416 {
t.Fatalf("wrong PowTarget: %f.", a.PowTarget)
}
if a.TargetPeer != "enode://915533f667b1369793ebb9bda022416b1295235a1420799cd87a969467372546d808ebf59c5c9ce23f103d59b61b97df8af91f0908552485975397181b993461@127.0.0.1:12345" {
t.Fatalf("wrong PeerID: %s.", a.TargetPeer)
}
}
func waitForMessages(api *PublicWhisperAPI, id string, target int) []*WhisperMessage {
// timeout: 2 seconds
result := make([]*WhisperMessage, 0, target)
for i := 0; i < 100; i++ {
mail := api.GetNewSubscriptionMessages(id)
if len(mail) > 0 {
for _, m := range mail {
result = append(result, m)
}
if len(result) >= target {
break
}
}
time.Sleep(time.Millisecond * 20)
}
return result
}
func TestIntegrationAsym(t *testing.T) {
w := New()
api := NewPublicWhisperAPI(w)
if api == nil {
t.Fatalf("failed to create API.")
}
api.Start()
defer api.Stop()
sig, err := api.NewKeyPair()
if err != nil {
t.Fatalf("failed NewIdentity: %s.", err)
}
if len(sig) == 0 {
t.Fatalf("wrong signature")
}
exist, err := api.HasKeyPair(sig)
if err != nil {
t.Fatalf("failed HasIdentity: %s.", err)
}
if !exist {
t.Fatalf("failed HasIdentity: false negative.")
}
sigPubKey, err := api.GetPublicKey(sig)
if err != nil {
t.Fatalf("failed GetPublicKey: %s.", err)
}
key, err := api.NewKeyPair()
if err != nil {
t.Fatalf("failed NewIdentity(): %s.", err)
}
if len(key) == 0 {
t.Fatalf("wrong key")
}
dstPubKey, err := api.GetPublicKey(key)
if err != nil {
t.Fatalf("failed GetPublicKey: %s.", err)
}
var topics [2]TopicType
topics[0] = TopicType{0x00, 0x64, 0x00, 0xff}
topics[1] = TopicType{0xf2, 0x6e, 0x77, 0x79}
var f WhisperFilterArgs
f.Symmetric = false
f.Key = key
f.Sig = sigPubKey.String()
f.Topics = make([][]byte, 2)
f.Topics[0] = topics[0][:]
f.Topics[1] = topics[1][:]
f.MinPoW = DefaultMinimumPoW / 2
f.AllowP2P = true
id, err := api.Subscribe(f)
if err != nil {
t.Fatalf("failed to create new filter: %s.", err)
}
var p PostArgs
p.Type = "asym"
p.TTL = 2
p.Sig = sig
p.Key = dstPubKey.String()
p.Padding = []byte("test string")
p.Payload = []byte("extended test string")
p.PowTarget = DefaultMinimumPoW
p.PowTime = 2
p.Topic = hexutil.Bytes{0xf2, 0x6e, 0x77, 0x79} // topics[1]
err = api.Post(p)
if err != nil {
t.Errorf("failed to post message: %s.", err)
}
mail := waitForMessages(api, id, 1)
if len(mail) != 1 {
t.Fatalf("failed to GetFilterChanges: got %d messages.", len(mail))
}
text := string(common.FromHex(mail[0].Payload))
if text != string("extended test string") {
t.Fatalf("failed to decrypt first message: %s.", text)
}
p.Padding = []byte("new value")
p.Payload = []byte("extended new value")
err = api.Post(p)
if err != nil {
t.Fatalf("failed to post next message: %s.", err)
}
mail = waitForMessages(api, id, 1)
if len(mail) != 1 {
t.Fatalf("failed to GetFilterChanges: got %d messages.", len(mail))
}
text = string(common.FromHex(mail[0].Payload))
if text != string("extended new value") {
t.Fatalf("failed to decrypt second message: %s.", text)
}
}
func TestIntegrationSym(t *testing.T) {
w := New()
api := NewPublicWhisperAPI(w)
if api == nil {
t.Fatalf("failed to create API.")
}
api.Start()
defer api.Stop()
symKeyID, err := api.GenerateSymmetricKey()
if err != nil {
t.Fatalf("failed GenerateSymKey: %s.", err)
}
sig, err := api.NewKeyPair()
if err != nil {
t.Fatalf("failed NewKeyPair: %s.", err)
}
if len(sig) == 0 {
t.Fatalf("wrong signature")
}
sigPubKey, err := api.GetPublicKey(sig)
if err != nil {
t.Fatalf("failed GetPublicKey: %s.", err)
}
exist, err := api.HasKeyPair(sig)
if err != nil {
t.Fatalf("failed HasIdentity: %s.", err)
}
if !exist {
t.Fatalf("failed HasIdentity: false negative.")
}
var topics [2]TopicType
topics[0] = TopicType{0x00, 0x7f, 0x80, 0xff}
topics[1] = TopicType{0xf2, 0x6e, 0x77, 0x79}
var f WhisperFilterArgs
f.Symmetric = true
f.Key = symKeyID
f.Topics = make([][]byte, 2)
f.Topics[0] = topics[0][:]
f.Topics[1] = topics[1][:]
f.MinPoW = DefaultMinimumPoW / 2
f.Sig = sigPubKey.String()
f.AllowP2P = false
id, err := api.Subscribe(f)
if err != nil {
t.Fatalf("failed to create new filter: %s.", err)
}
var p PostArgs
p.Type = "sym"
p.TTL = 1
p.Key = symKeyID
p.Sig = sig
p.Padding = []byte("test string")
p.Payload = []byte("extended test string")
p.PowTarget = DefaultMinimumPoW
p.PowTime = 2
p.Topic = hexutil.Bytes{0xf2, 0x6e, 0x77, 0x79}
err = api.Post(p)
if err != nil {
t.Fatalf("failed to post first message: %s.", err)
}
mail := waitForMessages(api, id, 1)
if len(mail) != 1 {
t.Fatalf("failed GetFilterChanges: got %d messages.", len(mail))
}
text := string(common.FromHex(mail[0].Payload))
if text != string("extended test string") {
t.Fatalf("failed to decrypt first message: %s.", text)
}
p.Padding = []byte("new value")
p.Payload = []byte("extended new value")
err = api.Post(p)
if err != nil {
t.Fatalf("failed to post second message: %s.", err)
}
mail = waitForMessages(api, id, 1)
if len(mail) != 1 {
t.Fatalf("failed second GetFilterChanges: got %d messages.", len(mail))
}
text = string(common.FromHex(mail[0].Payload))
if text != string("extended new value") {
t.Fatalf("failed to decrypt second message: %s.", text)
}
}
func TestIntegrationSymWithFilter(t *testing.T) {
w := New()
api := NewPublicWhisperAPI(w)
if api == nil {
t.Fatalf("failed to create API.")
}
api.Start()
defer api.Stop()
symKeyID, err := api.GenerateSymmetricKey()
if err != nil {
t.Fatalf("failed to GenerateSymKey: %s.", err)
}
sigKeyID, err := api.NewKeyPair()
if err != nil {
t.Fatalf("failed NewIdentity: %s.", err)
}
if len(sigKeyID) == 0 {
t.Fatalf("wrong signature.")
}
exist, err := api.HasKeyPair(sigKeyID)
if err != nil {
t.Fatalf("failed HasIdentity: %s.", err)
}
if !exist {
t.Fatalf("failed HasIdentity: does not exist.")
}
sigPubKey, err := api.GetPublicKey(sigKeyID)
if err != nil {
t.Fatalf("failed GetPublicKey: %s.", err)
}
var topics [2]TopicType
topics[0] = TopicType{0x00, 0x7f, 0x80, 0xff}
topics[1] = TopicType{0xf2, 0x6e, 0x77, 0x79}
var f WhisperFilterArgs
f.Symmetric = true
f.Key = symKeyID
f.Topics = make([][]byte, 2)
f.Topics[0] = topics[0][:]
f.Topics[1] = topics[1][:]
f.MinPoW = DefaultMinimumPoW / 2
f.Sig = sigPubKey.String()
f.AllowP2P = false
id, err := api.Subscribe(f)
if err != nil {
t.Fatalf("failed to create new filter: %s.", err)
}
var p PostArgs
p.Type = "sym"
p.TTL = 1
p.Key = symKeyID
p.Sig = sigKeyID
p.Padding = []byte("test string")
p.Payload = []byte("extended test string")
p.PowTarget = DefaultMinimumPoW
p.PowTime = 2
p.Topic = hexutil.Bytes{0xf2, 0x6e, 0x77, 0x79}
err = api.Post(p)
if err != nil {
t.Fatalf("failed to post message: %s.", err)
}
mail := waitForMessages(api, id, 1)
if len(mail) != 1 {
t.Fatalf("failed to GetFilterChanges: got %d messages.", len(mail))
}
text := string(common.FromHex(mail[0].Payload))
if text != string("extended test string") {
t.Fatalf("failed to decrypt first message: %s.", text)
}
p.Padding = []byte("new value")
p.Payload = []byte("extended new value")
err = api.Post(p)
if err != nil {
t.Fatalf("failed to post next message: %s.", err)
}
mail = waitForMessages(api, id, 1)
if len(mail) != 1 {
t.Fatalf("failed to GetFilterChanges: got %d messages.", len(mail))
}
text = string(common.FromHex(mail[0].Payload))
if text != string("extended new value") {
t.Fatalf("failed to decrypt second message: %s.", text)
}
}
func TestKey(t *testing.T) {
w := New()
api := NewPublicWhisperAPI(w)
if api == nil {
t.Fatalf("failed to create API.")
}
k, err := api.AddSymmetricKeyFromPassword("wwww")
if err != nil {
t.Fatalf("failed to create key: %s.", err)
}
s, err := api.GetSymmetricKey(k)
if err != nil {
t.Fatalf("failed to get sym key: %s.", err)
}
k2, err := api.AddSymmetricKeyDirect(s)
if err != nil {
t.Fatalf("failed to add sym key: %s.", err)
}
s2, err := api.GetSymmetricKey(k2)
if err != nil {
t.Fatalf("failed to get sym key: %s.", err)
}
if s.String() != "0x448652d595bd6ec00b2a9ea220ad6c26592d9bf4cf79023d3c1b30cb681e6e07" {
t.Fatalf("wrong key from password: %s", s.String())
}
if !bytes.Equal(s, s2) {
t.Fatalf("wrong key")
}
}
func TestSubscribe(t *testing.T) {
var err error
var s string
w := New()
api := NewPublicWhisperAPI(w)
if api == nil {
t.Fatalf("failed to create API.")
}
symKeyID, err := api.GenerateSymmetricKey()
if err != nil {
t.Fatalf("failed to GenerateSymKey: %s.", err)
}
var f WhisperFilterArgs
f.Symmetric = true
f.Key = symKeyID
f.Topics = make([][]byte, 5)
f.Topics[0] = []byte{0x21}
f.Topics[1] = []byte{0xd2, 0xe3}
f.Topics[2] = []byte{0x64, 0x75, 0x76}
f.Topics[3] = []byte{0xf8, 0xe9, 0xa0, 0xba}
f.Topics[4] = []byte{0xcb, 0x3c, 0xdd, 0xee, 0xff}
s, err = api.Subscribe(f)
if err == nil {
t.Fatalf("Subscribe: false positive.")
}
f.Topics[4] = []byte{}
if err == nil {
t.Fatalf("Subscribe: false positive again.")
}
f.Topics[4] = []byte{0x00}
s, err = api.Subscribe(f)
if err != nil {
t.Fatalf("failed to subscribe: %s.", err)
} else {
api.Unsubscribe(s)
}
}

View File

@ -0,0 +1,29 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package whisperv5
type Config struct {
MaxMessageSize uint32 `toml:",omitempty"`
MinimumAcceptedPOW float64 `toml:",omitempty"`
}
var DefaultConfig = Config{
MaxMessageSize: DefaultMaxMessageSize,
MinimumAcceptedPOW: DefaultMinimumPoW,
}
var ()

View File

@ -55,8 +55,9 @@ const (
AESNonceLength = 12 AESNonceLength = 12
keyIdSize = 32 keyIdSize = 32
DefaultMaxMessageLength = 1024 * 1024 MaxMessageSize = uint32(10 * 1024 * 1024) // maximum accepted size of a message.
DefaultMinimumPoW = 0.2 DefaultMaxMessageSize = uint32(1024 * 1024)
DefaultMinimumPoW = 0.2
padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol (must not exceed 2^24) padSizeLimit = 256 // just an arbitrary number, could be changed without breaking the protocol (must not exceed 2^24)
messageQueueLimit = 1024 messageQueueLimit = 1024

View File

@ -62,7 +62,7 @@ func (e *Envelope) rlpWithoutNonce() []byte {
// NewEnvelope wraps a Whisper message with expiration and destination data // NewEnvelope wraps a Whisper message with expiration and destination data
// included into an envelope for network forwarding. // included into an envelope for network forwarding.
func NewEnvelope(ttl uint32, topic TopicType, aesNonce []byte, msg *SentMessage) *Envelope { func NewEnvelope(ttl uint32, topic TopicType, aesNonce []byte, msg *sentMessage) *Envelope {
env := Envelope{ env := Envelope{
Version: make([]byte, 1), Version: make([]byte, 1),
Expiry: uint32(time.Now().Add(time.Second * time.Duration(ttl)).Unix()), Expiry: uint32(time.Now().Add(time.Second * time.Duration(ttl)).Unix()),

View File

@ -163,6 +163,7 @@ func (f *Filter) Retrieve() (all []*ReceivedMessage) {
for _, msg := range f.Messages { for _, msg := range f.Messages {
all = append(all, msg) all = append(all, msg)
} }
f.Messages = make(map[common.Hash]*ReceivedMessage) // delete old messages f.Messages = make(map[common.Hash]*ReceivedMessage) // delete old messages
return all return all
} }

View File

@ -97,7 +97,7 @@ func TestInstallFilters(t *testing.T) {
InitSingleTest() InitSingleTest()
const SizeTestFilters = 256 const SizeTestFilters = 256
w := New() w := New(&Config{})
filters := NewFilters(w) filters := NewFilters(w)
tst := generateTestCases(t, SizeTestFilters) tst := generateTestCases(t, SizeTestFilters)
@ -542,7 +542,7 @@ func TestWatchers(t *testing.T) {
var x, firstID string var x, firstID string
var err error var err error
w := New() w := New(&Config{})
filters := NewFilters(w) filters := NewFilters(w)
tst := generateTestCases(t, NumFilters) tst := generateTestCases(t, NumFilters)
for i = 0; i < NumFilters; i++ { for i = 0; i < NumFilters; i++ {

View File

@ -0,0 +1,62 @@
// Code generated by github.com/fjl/gencodec. DO NOT EDIT.
package whisperv5
import (
"encoding/json"
"github.com/ethereum/go-ethereum/common/hexutil"
)
func (c Criteria) MarshalJSON() ([]byte, error) {
type Criteria struct {
SymKeyID string `json:"symKeyID"`
PrivateKeyID string `json:"privateKeyID"`
Sig hexutil.Bytes `json:"sig"`
MinPow float64 `json:"minPow"`
Topics []TopicType `json:"topics"`
AllowP2P bool `json:"allowP2P"`
}
var enc Criteria
enc.SymKeyID = c.SymKeyID
enc.PrivateKeyID = c.PrivateKeyID
enc.Sig = c.Sig
enc.MinPow = c.MinPow
enc.Topics = c.Topics
enc.AllowP2P = c.AllowP2P
return json.Marshal(&enc)
}
func (c *Criteria) UnmarshalJSON(input []byte) error {
type Criteria struct {
SymKeyID *string `json:"symKeyID"`
PrivateKeyID *string `json:"privateKeyID"`
Sig hexutil.Bytes `json:"sig"`
MinPow *float64 `json:"minPow"`
Topics []TopicType `json:"topics"`
AllowP2P *bool `json:"allowP2P"`
}
var dec Criteria
if err := json.Unmarshal(input, &dec); err != nil {
return err
}
if dec.SymKeyID != nil {
c.SymKeyID = *dec.SymKeyID
}
if dec.PrivateKeyID != nil {
c.PrivateKeyID = *dec.PrivateKeyID
}
if dec.Sig != nil {
c.Sig = dec.Sig
}
if dec.MinPow != nil {
c.MinPow = *dec.MinPow
}
if dec.Topics != nil {
c.Topics = dec.Topics
}
if dec.AllowP2P != nil {
c.AllowP2P = *dec.AllowP2P
}
return nil
}

View File

@ -0,0 +1,80 @@
// Code generated by github.com/fjl/gencodec. DO NOT EDIT.
package whisperv5
import (
"encoding/json"
"github.com/ethereum/go-ethereum/common/hexutil"
)
func (m Message) MarshalJSON() ([]byte, error) {
type Message struct {
Sig hexutil.Bytes `json:"sig,omitempty"`
TTL uint32 `json:"ttl"`
Timestamp uint32 `json:"timestamp"`
Topic TopicType `json:"topic"`
Payload hexutil.Bytes `json:"payload"`
Padding hexutil.Bytes `json:"padding"`
PoW float64 `json:"pow"`
Hash hexutil.Bytes `json:"hash"`
Dst hexutil.Bytes `json:"recipientPublicKey,omitempty"`
}
var enc Message
enc.Sig = m.Sig
enc.TTL = m.TTL
enc.Timestamp = m.Timestamp
enc.Topic = m.Topic
enc.Payload = m.Payload
enc.Padding = m.Padding
enc.PoW = m.PoW
enc.Hash = m.Hash
enc.Dst = m.Dst
return json.Marshal(&enc)
}
func (m *Message) UnmarshalJSON(input []byte) error {
type Message struct {
Sig hexutil.Bytes `json:"sig,omitempty"`
TTL *uint32 `json:"ttl"`
Timestamp *uint32 `json:"timestamp"`
Topic *TopicType `json:"topic"`
Payload hexutil.Bytes `json:"payload"`
Padding hexutil.Bytes `json:"padding"`
PoW *float64 `json:"pow"`
Hash hexutil.Bytes `json:"hash"`
Dst hexutil.Bytes `json:"recipientPublicKey,omitempty"`
}
var dec Message
if err := json.Unmarshal(input, &dec); err != nil {
return err
}
if dec.Sig != nil {
m.Sig = dec.Sig
}
if dec.TTL != nil {
m.TTL = *dec.TTL
}
if dec.Timestamp != nil {
m.Timestamp = *dec.Timestamp
}
if dec.Topic != nil {
m.Topic = *dec.Topic
}
if dec.Payload != nil {
m.Payload = dec.Payload
}
if dec.Padding != nil {
m.Padding = dec.Padding
}
if dec.PoW != nil {
m.PoW = *dec.PoW
}
if dec.Hash != nil {
m.Hash = dec.Hash
}
if dec.Dst != nil {
m.Dst = dec.Dst
}
return nil
}

View File

@ -0,0 +1,86 @@
// Code generated by github.com/fjl/gencodec. DO NOT EDIT.
package whisperv5
import (
"encoding/json"
"github.com/ethereum/go-ethereum/common/hexutil"
)
func (n NewMessage) MarshalJSON() ([]byte, error) {
type NewMessage struct {
SymKeyID string `json:"symKeyID"`
PublicKey hexutil.Bytes `json:"pubKey"`
Sig string `json:"sig"`
TTL uint32 `json:"ttl"`
Topic TopicType `json:"topic"`
Payload hexutil.Bytes `json:"payload"`
Padding hexutil.Bytes `json:"padding"`
PowTime uint32 `json:"powTime"`
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
}
var enc NewMessage
enc.SymKeyID = n.SymKeyID
enc.PublicKey = n.PublicKey
enc.Sig = n.Sig
enc.TTL = n.TTL
enc.Topic = n.Topic
enc.Payload = n.Payload
enc.Padding = n.Padding
enc.PowTime = n.PowTime
enc.PowTarget = n.PowTarget
enc.TargetPeer = n.TargetPeer
return json.Marshal(&enc)
}
func (n *NewMessage) UnmarshalJSON(input []byte) error {
type NewMessage struct {
SymKeyID *string `json:"symKeyID"`
PublicKey hexutil.Bytes `json:"pubKey"`
Sig *string `json:"sig"`
TTL *uint32 `json:"ttl"`
Topic *TopicType `json:"topic"`
Payload hexutil.Bytes `json:"payload"`
Padding hexutil.Bytes `json:"padding"`
PowTime *uint32 `json:"powTime"`
PowTarget *float64 `json:"powTarget"`
TargetPeer *string `json:"targetPeer"`
}
var dec NewMessage
if err := json.Unmarshal(input, &dec); err != nil {
return err
}
if dec.SymKeyID != nil {
n.SymKeyID = *dec.SymKeyID
}
if dec.PublicKey != nil {
n.PublicKey = dec.PublicKey
}
if dec.Sig != nil {
n.Sig = *dec.Sig
}
if dec.TTL != nil {
n.TTL = *dec.TTL
}
if dec.Topic != nil {
n.Topic = *dec.Topic
}
if dec.Payload != nil {
n.Payload = dec.Payload
}
if dec.Padding != nil {
n.Padding = dec.Padding
}
if dec.PowTime != nil {
n.PowTime = *dec.PowTime
}
if dec.PowTarget != nil {
n.PowTarget = *dec.PowTarget
}
if dec.TargetPeer != nil {
n.TargetPeer = *dec.TargetPeer
}
return nil
}

View File

@ -49,7 +49,7 @@ type MessageParams struct {
// SentMessage represents an end-user data packet to transmit through the // SentMessage represents an end-user data packet to transmit through the
// Whisper protocol. These are wrapped into Envelopes that need not be // Whisper protocol. These are wrapped into Envelopes that need not be
// understood by intermediate nodes, just forwarded. // understood by intermediate nodes, just forwarded.
type SentMessage struct { type sentMessage struct {
Raw []byte Raw []byte
} }
@ -87,8 +87,8 @@ func (msg *ReceivedMessage) isAsymmetricEncryption() bool {
} }
// NewMessage creates and initializes a non-signed, non-encrypted Whisper message. // NewMessage creates and initializes a non-signed, non-encrypted Whisper message.
func NewSentMessage(params *MessageParams) (*SentMessage, error) { func NewSentMessage(params *MessageParams) (*sentMessage, error) {
msg := SentMessage{} msg := sentMessage{}
msg.Raw = make([]byte, 1, len(params.Payload)+len(params.Padding)+signatureLength+padSizeLimit) msg.Raw = make([]byte, 1, len(params.Payload)+len(params.Padding)+signatureLength+padSizeLimit)
msg.Raw[0] = 0 // set all the flags to zero msg.Raw[0] = 0 // set all the flags to zero
err := msg.appendPadding(params) err := msg.appendPadding(params)
@ -119,7 +119,7 @@ func intSize(i int) (s int) {
// appendPadding appends the pseudorandom padding bytes and sets the padding flag. // appendPadding appends the pseudorandom padding bytes and sets the padding flag.
// The last byte contains the size of padding (thus, its size must not exceed 256). // The last byte contains the size of padding (thus, its size must not exceed 256).
func (msg *SentMessage) appendPadding(params *MessageParams) error { func (msg *sentMessage) appendPadding(params *MessageParams) error {
rawSize := len(params.Payload) + 1 rawSize := len(params.Payload) + 1
if params.Src != nil { if params.Src != nil {
rawSize += signatureLength rawSize += signatureLength
@ -164,7 +164,7 @@ func (msg *SentMessage) appendPadding(params *MessageParams) error {
// sign calculates and sets the cryptographic signature for the message, // sign calculates and sets the cryptographic signature for the message,
// also setting the sign flag. // also setting the sign flag.
func (msg *SentMessage) sign(key *ecdsa.PrivateKey) error { func (msg *sentMessage) sign(key *ecdsa.PrivateKey) error {
if isMessageSigned(msg.Raw[0]) { if isMessageSigned(msg.Raw[0]) {
// this should not happen, but no reason to panic // this should not happen, but no reason to panic
log.Error("failed to sign the message: already signed") log.Error("failed to sign the message: already signed")
@ -183,7 +183,7 @@ func (msg *SentMessage) sign(key *ecdsa.PrivateKey) error {
} }
// encryptAsymmetric encrypts a message with a public key. // encryptAsymmetric encrypts a message with a public key.
func (msg *SentMessage) encryptAsymmetric(key *ecdsa.PublicKey) error { func (msg *sentMessage) encryptAsymmetric(key *ecdsa.PublicKey) error {
if !ValidatePublicKey(key) { if !ValidatePublicKey(key) {
return errors.New("invalid public key provided for asymmetric encryption") return errors.New("invalid public key provided for asymmetric encryption")
} }
@ -196,7 +196,7 @@ func (msg *SentMessage) encryptAsymmetric(key *ecdsa.PublicKey) error {
// encryptSymmetric encrypts a message with a topic key, using AES-GCM-256. // encryptSymmetric encrypts a message with a topic key, using AES-GCM-256.
// nonce size should be 12 bytes (see cipher.gcmStandardNonceSize). // nonce size should be 12 bytes (see cipher.gcmStandardNonceSize).
func (msg *SentMessage) encryptSymmetric(key []byte) (nonce []byte, err error) { func (msg *sentMessage) encryptSymmetric(key []byte) (nonce []byte, err error) {
if !validateSymmetricKey(key) { if !validateSymmetricKey(key) {
return nil, errors.New("invalid key provided for symmetric encryption") return nil, errors.New("invalid key provided for symmetric encryption")
} }
@ -224,13 +224,12 @@ func (msg *SentMessage) encryptSymmetric(key []byte) (nonce []byte, err error) {
} }
// Wrap bundles the message into an Envelope to transmit over the network. // Wrap bundles the message into an Envelope to transmit over the network.
func (msg *SentMessage) Wrap(options *MessageParams) (envelope *Envelope, err error) { func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err error) {
if options.TTL == 0 { if options.TTL == 0 {
options.TTL = DefaultTTL options.TTL = DefaultTTL
} }
if options.Src != nil { if options.Src != nil {
err = msg.sign(options.Src) if err = msg.sign(options.Src); err != nil {
if err != nil {
return nil, err return nil, err
} }
} }
@ -242,14 +241,12 @@ func (msg *SentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er
} else { } else {
err = errors.New("unable to encrypt the message: neither symmetric nor assymmetric key provided") err = errors.New("unable to encrypt the message: neither symmetric nor assymmetric key provided")
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
envelope = NewEnvelope(options.TTL, options.Topic, nonce, msg) envelope = NewEnvelope(options.TTL, options.Topic, nonce, msg)
err = envelope.Seal(options) if err = envelope.Seal(options); err != nil {
if err != nil {
return nil, err return nil, err
} }
return envelope, nil return envelope, nil

View File

@ -113,7 +113,7 @@ func initialize(t *testing.T) {
for i := 0; i < NumNodes; i++ { for i := 0; i < NumNodes; i++ {
var node TestNode var node TestNode
node.shh = New() node.shh = New(&DefaultConfig)
node.shh.SetMinimumPoW(0.00000001) node.shh.SetMinimumPoW(0.00000001)
node.shh.Start(nil) node.shh.Start(nil)
topics := make([]TopicType, 0) topics := make([]TopicType, 0)

View File

@ -19,10 +19,8 @@
package whisperv5 package whisperv5
import ( import (
"fmt"
"strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
) )
// Topic represents a cryptographically secure, probabilistic partial // Topic represents a cryptographically secure, probabilistic partial
@ -46,24 +44,12 @@ func (topic *TopicType) String() string {
return string(common.ToHex(topic[:])) return string(common.ToHex(topic[:]))
} }
// UnmarshalJSON parses a hex representation to a topic. // MarshalText returns the hex representation of t.
func (t *TopicType) UnmarshalJSON(input []byte) error { func (t TopicType) MarshalText() ([]byte, error) {
length := len(input) return hexutil.Bytes(t[:]).MarshalText()
if length >= 2 && input[0] == '"' && input[length-1] == '"' { }
input = input[1 : length-1]
} // UnmarshalText parses a hex representation to a topic.
// strip "0x" for length check func (t *TopicType) UnmarshalText(input []byte) error {
if len(input) > 1 && strings.ToLower(string(input[:2])) == "0x" { return hexutil.UnmarshalFixedText("Topic", input, t[:])
input = input[2:]
}
// validate the length of the input
if len(input) != TopicLength*2 {
return fmt.Errorf("unmarshalJSON failed: topic must be exactly %d bytes", TopicLength)
}
b := common.FromHex(string(input))
if b == nil {
return fmt.Errorf("unmarshalJSON failed: wrong topic format")
}
*t = BytesToTopic(b)
return nil
} }

View File

@ -16,7 +16,10 @@
package whisperv5 package whisperv5
import "testing" import (
"encoding/json"
"testing"
)
var topicStringTests = []struct { var topicStringTests = []struct {
topic TopicType topic TopicType
@ -53,6 +56,38 @@ var bytesToTopicTests = []struct {
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: nil}, {topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: nil},
} }
var unmarshalTestsGood = []struct {
topic TopicType
data []byte
}{
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x00000000"`)},
{topic: TopicType{0x00, 0x7f, 0x80, 0xff}, data: []byte(`"0x007f80ff"`)},
{topic: TopicType{0xff, 0x80, 0x7f, 0x00}, data: []byte(`"0xff807f00"`)},
{topic: TopicType{0xf2, 0x6e, 0x77, 0x79}, data: []byte(`"0xf26e7779"`)},
}
var unmarshalTestsBad = []struct {
topic TopicType
data []byte
}{
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x0000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x000000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0x0000000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"000000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"0000000000"`)},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte(`"abcdefg0"`)},
}
var unmarshalTestsUgly = []struct {
topic TopicType
data []byte
}{
{topic: TopicType{0x01, 0x00, 0x00, 0x00}, data: []byte(`"0x00000001"`)},
}
func TestBytesToTopic(t *testing.T) { func TestBytesToTopic(t *testing.T) {
for i, tst := range bytesToTopicTests { for i, tst := range bytesToTopicTests {
top := BytesToTopic(tst.data) top := BytesToTopic(tst.data)
@ -62,51 +97,14 @@ func TestBytesToTopic(t *testing.T) {
} }
} }
var unmarshalTestsGood = []struct {
topic TopicType
data []byte
}{
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("0x00000000")},
{topic: TopicType{0x00, 0x7f, 0x80, 0xff}, data: []byte("0x007f80ff")},
{topic: TopicType{0xff, 0x80, 0x7f, 0x00}, data: []byte("0xff807f00")},
{topic: TopicType{0xf2, 0x6e, 0x77, 0x79}, data: []byte("0xf26e7779")},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("00000000")},
{topic: TopicType{0x00, 0x80, 0x01, 0x00}, data: []byte("00800100")},
{topic: TopicType{0x00, 0x7f, 0x80, 0xff}, data: []byte("007f80ff")},
{topic: TopicType{0xff, 0x80, 0x7f, 0x00}, data: []byte("ff807f00")},
{topic: TopicType{0xf2, 0x6e, 0x77, 0x79}, data: []byte("f26e7779")},
}
var unmarshalTestsBad = []struct {
topic TopicType
data []byte
}{
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("0x000000")},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("0x0000000")},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("0x000000000")},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("0x0000000000")},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("000000")},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("0000000")},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("000000000")},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("0000000000")},
{topic: TopicType{0x00, 0x00, 0x00, 0x00}, data: []byte("abcdefg0")},
}
var unmarshalTestsUgly = []struct {
topic TopicType
data []byte
}{
{topic: TopicType{0x01, 0x00, 0x00, 0x00}, data: []byte("00000001")},
}
func TestUnmarshalTestsGood(t *testing.T) { func TestUnmarshalTestsGood(t *testing.T) {
for i, tst := range unmarshalTestsGood { for i, tst := range unmarshalTestsGood {
var top TopicType var top TopicType
err := top.UnmarshalJSON(tst.data) err := json.Unmarshal(tst.data, &top)
if err != nil { if err != nil {
t.Fatalf("failed test %d. input: %v.", i, tst.data) t.Errorf("failed test %d. input: %v. err: %v", i, tst.data, err)
} else if top != tst.topic { } else if top != tst.topic {
t.Fatalf("failed test %d: have %v, want %v.", i, t, tst.topic) t.Errorf("failed test %d: have %v, want %v.", i, t, tst.topic)
} }
} }
} }
@ -115,7 +113,7 @@ func TestUnmarshalTestsBad(t *testing.T) {
// in this test UnmarshalJSON() is supposed to fail // in this test UnmarshalJSON() is supposed to fail
for i, tst := range unmarshalTestsBad { for i, tst := range unmarshalTestsBad {
var top TopicType var top TopicType
err := top.UnmarshalJSON(tst.data) err := json.Unmarshal(tst.data, &top)
if err == nil { if err == nil {
t.Fatalf("failed test %d. input: %v.", i, tst.data) t.Fatalf("failed test %d. input: %v.", i, tst.data)
} }
@ -126,11 +124,11 @@ func TestUnmarshalTestsUgly(t *testing.T) {
// in this test UnmarshalJSON() is NOT supposed to fail, but result should be wrong // in this test UnmarshalJSON() is NOT supposed to fail, but result should be wrong
for i, tst := range unmarshalTestsUgly { for i, tst := range unmarshalTestsUgly {
var top TopicType var top TopicType
err := top.UnmarshalJSON(tst.data) err := json.Unmarshal(tst.data, &top)
if err != nil { if err != nil {
t.Fatalf("failed test %d. input: %v.", i, tst.data) t.Errorf("failed test %d. input: %v.", i, tst.data)
} else if top == tst.topic { } else if top == tst.topic {
t.Fatalf("failed test %d: have %v, want %v.", i, top, tst.topic) t.Errorf("failed test %d: have %v, want %v.", i, top, tst.topic)
} }
} }
} }

View File

@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"golang.org/x/crypto/pbkdf2" "golang.org/x/crypto/pbkdf2"
"golang.org/x/sync/syncmap"
set "gopkg.in/fatih/set.v0" set "gopkg.in/fatih/set.v0"
) )
@ -44,6 +45,12 @@ type Statistics struct {
totalMessagesCleared int totalMessagesCleared int
} }
const (
minPowIdx = iota // Minimal PoW required by the whisper node
maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
overflowIdx = iota // Indicator of message queue overflow
)
// Whisper represents a dark communication interface through the Ethereum // Whisper represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer. // network, using its very own P2P communication layer.
type Whisper struct { type Whisper struct {
@ -54,28 +61,31 @@ type Whisper struct {
symKeys map[string][]byte // Symmetric key storage symKeys map[string][]byte // Symmetric key storage
keyMu sync.RWMutex // Mutex associated with key storages keyMu sync.RWMutex // Mutex associated with key storages
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node
expirations map[uint32]*set.SetNonTS // Message expiration pool expirations map[uint32]*set.SetNonTS // Message expiration pool
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
peers map[*Peer]struct{} // Set of currently active peers
peerMu sync.RWMutex // Mutex to sync the active peer set peerMu sync.RWMutex // Mutex to sync the active peer set
peers map[*Peer]struct{} // Set of currently active peers
messageQueue chan *Envelope // Message queue for normal whisper messages messageQueue chan *Envelope // Message queue for normal whisper messages
p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further) p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further)
quit chan struct{} // Channel used for graceful exit quit chan struct{} // Channel used for graceful exit
minPoW float64 // Minimal PoW required by the whisper node settings syncmap.Map // holds configuration settings that can be dynamically changed
maxMsgLength int // Maximal message length allowed by the whisper node
overflow bool // Indicator of message queue overflow
stats Statistics // Statistics of whisper node statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
mailServer MailServer // MailServer interface mailServer MailServer // MailServer interface
} }
// New creates a Whisper client ready to communicate through the Ethereum P2P network. // New creates a Whisper client ready to communicate through the Ethereum P2P network.
func New() *Whisper { func New(cfg *Config) *Whisper {
if cfg == nil {
cfg = &DefaultConfig
}
whisper := &Whisper{ whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey), privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte), symKeys: make(map[string][]byte),
@ -85,22 +95,49 @@ func New() *Whisper {
messageQueue: make(chan *Envelope, messageQueueLimit), messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit), p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}), quit: make(chan struct{}),
minPoW: DefaultMinimumPoW,
maxMsgLength: DefaultMaxMessageLength,
} }
whisper.filters = NewFilters(whisper) whisper.filters = NewFilters(whisper)
whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW)
whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize)
whisper.settings.Store(overflowIdx, false)
// p2p whisper sub protocol handler // p2p whisper sub protocol handler
whisper.protocol = p2p.Protocol{ whisper.protocol = p2p.Protocol{
Name: ProtocolName, Name: ProtocolName,
Version: uint(ProtocolVersion), Version: uint(ProtocolVersion),
Length: NumberOfMessageCodes, Length: NumberOfMessageCodes,
Run: whisper.HandlePeer, Run: whisper.HandlePeer,
NodeInfo: func() interface{} {
return map[string]interface{}{
"version": ProtocolVersionStr,
"maxMessageSize": whisper.MaxMessageSize(),
"minimumPoW": whisper.MinPow(),
}
},
} }
return whisper return whisper
} }
func (w *Whisper) MinPow() float64 {
val, _ := w.settings.Load(minPowIdx)
return val.(float64)
}
// MaxMessageSize returns the maximum accepted message size.
func (w *Whisper) MaxMessageSize() uint32 {
val, _ := w.settings.Load(maxMsgSizeIdx)
return val.(uint32)
}
// Overflow returns an indication if the message queue is full.
func (w *Whisper) Overflow() bool {
val, _ := w.settings.Load(overflowIdx)
return val.(bool)
}
// APIs returns the RPC descriptors the Whisper implementation offers // APIs returns the RPC descriptors the Whisper implementation offers
func (w *Whisper) APIs() []rpc.API { func (w *Whisper) APIs() []rpc.API {
return []rpc.API{ return []rpc.API{
@ -129,12 +166,12 @@ func (w *Whisper) Version() uint {
return w.protocol.Version return w.protocol.Version
} }
// SetMaxMessageLength sets the maximal message length allowed by this node // SetMaxMessageSize sets the maximal message size allowed by this node
func (w *Whisper) SetMaxMessageLength(val int) error { func (w *Whisper) SetMaxMessageSize(size uint32) error {
if val <= 0 { if size > MaxMessageSize {
return fmt.Errorf("invalid message length: %d", val) return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize)
} }
w.maxMsgLength = val w.settings.Store(maxMsgSizeIdx, uint32(size))
return nil return nil
} }
@ -143,7 +180,7 @@ func (w *Whisper) SetMinimumPoW(val float64) error {
if val <= 0.0 { if val <= 0.0 {
return fmt.Errorf("invalid PoW: %f", val) return fmt.Errorf("invalid PoW: %f", val)
} }
w.minPoW = val w.settings.Store(minPowIdx, val)
return nil return nil
} }
@ -240,6 +277,20 @@ func (w *Whisper) DeleteKeyPair(key string) bool {
return false return false
} }
// AddKeyPair imports a asymmetric private key and returns it identifier.
func (w *Whisper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
id, err := GenerateRandomID()
if err != nil {
return "", fmt.Errorf("failed to generate ID: %s", err)
}
w.keyMu.Lock()
w.privateKeys[id] = key
w.keyMu.Unlock()
return id, nil
}
// HasKeyPair checks if the the whisper node is configured with the private key // HasKeyPair checks if the the whisper node is configured with the private key
// of the specified public pair. // of the specified public pair.
func (w *Whisper) HasKeyPair(id string) bool { func (w *Whisper) HasKeyPair(id string) bool {
@ -451,7 +502,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("message loop", "peer", p.peer.ID(), "err", err) log.Warn("message loop", "peer", p.peer.ID(), "err", err)
return err return err
} }
if packet.Size > uint32(wh.maxMsgLength) { if packet.Size > wh.MaxMessageSize() {
log.Warn("oversized message received", "peer", p.peer.ID()) log.Warn("oversized message received", "peer", p.peer.ID())
return errors.New("oversized message received") return errors.New("oversized message received")
} }
@ -532,7 +583,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
} }
} }
if envelope.size() > wh.maxMsgLength { if uint32(envelope.size()) > wh.MaxMessageSize() {
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
} }
@ -547,7 +598,7 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
return false, fmt.Errorf("wrong size of AESNonce: %d bytes [env: %x]", aesNonceSize, envelope.Hash()) return false, fmt.Errorf("wrong size of AESNonce: %d bytes [env: %x]", aesNonceSize, envelope.Hash())
} }
if envelope.PoW() < wh.minPoW { if envelope.PoW() < wh.MinPow() {
log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex()) log.Debug("envelope with low PoW dropped", "PoW", envelope.PoW(), "hash", envelope.Hash().Hex())
return false, nil // drop envelope without error return false, nil // drop envelope without error
} }
@ -571,7 +622,9 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex()) log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
} else { } else {
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex()) log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
wh.statsMu.Lock()
wh.stats.memoryUsed += envelope.size() wh.stats.memoryUsed += envelope.size()
wh.statsMu.Unlock()
wh.postEvent(envelope, false) // notify the local node about the new message wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil { if wh.mailServer != nil {
wh.mailServer.Archive(envelope) wh.mailServer.Archive(envelope)
@ -600,13 +653,13 @@ func (w *Whisper) checkOverflow() {
queueSize := len(w.messageQueue) queueSize := len(w.messageQueue)
if queueSize == messageQueueLimit { if queueSize == messageQueueLimit {
if !w.overflow { if !w.Overflow() {
w.overflow = true w.settings.Store(overflowIdx, true)
log.Warn("message queue overflow") log.Warn("message queue overflow")
} }
} else if queueSize <= messageQueueLimit/2 { } else if queueSize <= messageQueueLimit/2 {
if w.overflow { if w.Overflow() {
w.overflow = false w.settings.Store(overflowIdx, false)
log.Warn("message queue overflow fixed (back to normal)") log.Warn("message queue overflow fixed (back to normal)")
} }
} }
@ -653,6 +706,8 @@ func (w *Whisper) expire() {
w.poolMu.Lock() w.poolMu.Lock()
defer w.poolMu.Unlock() defer w.poolMu.Unlock()
w.statsMu.Lock()
defer w.statsMu.Unlock()
w.stats.reset() w.stats.reset()
now := uint32(time.Now().Unix()) now := uint32(time.Now().Unix())
for expiry, hashSet := range w.expirations { for expiry, hashSet := range w.expirations {
@ -673,17 +728,11 @@ func (w *Whisper) expire() {
} }
// Stats returns the whisper node statistics. // Stats returns the whisper node statistics.
func (w *Whisper) Stats() string { func (w *Whisper) Stats() Statistics {
result := fmt.Sprintf("Memory usage: %d bytes. Average messages cleared per expiry cycle: %d. Total messages cleared: %d.", w.statsMu.Lock()
w.stats.memoryUsed, w.stats.totalMessagesCleared/w.stats.cycles, w.stats.totalMessagesCleared) defer w.statsMu.Unlock()
if w.stats.messagesCleared > 0 {
result += fmt.Sprintf(" Latest expiry cycle cleared %d messages (%d bytes).", return w.stats
w.stats.messagesCleared, w.stats.memoryCleared)
}
if w.overflow {
result += " Message queue state: overflow."
}
return result
} }
// Envelopes retrieves all the messages currently pooled by the node. // Envelopes retrieves all the messages currently pooled by the node.
@ -734,15 +783,6 @@ func (s *Statistics) reset() {
s.messagesCleared = 0 s.messagesCleared = 0
} }
// ValidateKeyID checks the format of key id.
func ValidateKeyID(id string) error {
const target = keyIdSize * 2
if len(id) != target {
return fmt.Errorf("wrong size of key ID (expected %d bytes, got %d)", target, len(id))
}
return nil
}
// ValidatePublicKey checks the format of the given public key. // ValidatePublicKey checks the format of the given public key.
func ValidatePublicKey(k *ecdsa.PublicKey) bool { func ValidatePublicKey(k *ecdsa.PublicKey) bool {
return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0 return k != nil && k.X != nil && k.Y != nil && k.X.Sign() != 0 && k.Y.Sign() != 0

View File

@ -18,13 +18,14 @@ package whisperv5
import ( import (
"bytes" "bytes"
"crypto/ecdsa"
mrand "math/rand" mrand "math/rand"
"testing" "testing"
"time" "time"
) )
func TestWhisperBasic(t *testing.T) { func TestWhisperBasic(t *testing.T) {
w := New() w := New(&DefaultConfig)
p := w.Protocols() p := w.Protocols()
shh := p[0] shh := p[0]
if shh.Name != ProtocolName { if shh.Name != ProtocolName {
@ -117,8 +118,39 @@ func TestWhisperBasic(t *testing.T) {
} }
} }
func TestWhisperAsymmetricKeyImport(t *testing.T) {
var (
w = New(&DefaultConfig)
privateKeys []*ecdsa.PrivateKey
)
for i := 0; i < 50; i++ {
id, err := w.NewKeyPair()
if err != nil {
t.Fatalf("could not generate key: %v", err)
}
pk, err := w.GetPrivateKey(id)
if err != nil {
t.Fatalf("could not export private key: %v", err)
}
privateKeys = append(privateKeys, pk)
if !w.DeleteKeyPair(id) {
t.Fatalf("could not delete private key")
}
}
for _, pk := range privateKeys {
if _, err := w.AddKeyPair(pk); err != nil {
t.Fatalf("could not import private key: %v", err)
}
}
}
func TestWhisperIdentityManagement(t *testing.T) { func TestWhisperIdentityManagement(t *testing.T) {
w := New() w := New(&DefaultConfig)
id1, err := w.NewKeyPair() id1, err := w.NewKeyPair()
if err != nil { if err != nil {
t.Fatalf("failed to generate new key pair: %s.", err) t.Fatalf("failed to generate new key pair: %s.", err)
@ -240,7 +272,7 @@ func TestWhisperSymKeyManagement(t *testing.T) {
var err error var err error
var k1, k2 []byte var k1, k2 []byte
w := New() w := New(&DefaultConfig)
id1 := string("arbitrary-string-1") id1 := string("arbitrary-string-1")
id2 := string("arbitrary-string-2") id2 := string("arbitrary-string-2")
@ -443,7 +475,7 @@ func TestWhisperSymKeyManagement(t *testing.T) {
func TestExpiry(t *testing.T) { func TestExpiry(t *testing.T) {
InitSingleTest() InitSingleTest()
w := New() w := New(&DefaultConfig)
w.SetMinimumPoW(0.0000001) w.SetMinimumPoW(0.0000001)
defer w.SetMinimumPoW(DefaultMinimumPoW) defer w.SetMinimumPoW(DefaultMinimumPoW)
w.Start(nil) w.Start(nil)
@ -500,9 +532,9 @@ func TestExpiry(t *testing.T) {
func TestCustomization(t *testing.T) { func TestCustomization(t *testing.T) {
InitSingleTest() InitSingleTest()
w := New() w := New(&DefaultConfig)
defer w.SetMinimumPoW(DefaultMinimumPoW) defer w.SetMinimumPoW(DefaultMinimumPoW)
defer w.SetMaxMessageLength(DefaultMaxMessageLength) defer w.SetMaxMessageSize(DefaultMaxMessageSize)
w.Start(nil) w.Start(nil)
defer w.Stop() defer w.Stop()
@ -547,13 +579,13 @@ func TestCustomization(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("failed Wrap with seed %d: %s.", seed, err) t.Fatalf("failed Wrap with seed %d: %s.", seed, err)
} }
w.SetMaxMessageLength(env.size() - 1) w.SetMaxMessageSize(uint32(env.size() - 1))
err = w.Send(env) err = w.Send(env)
if err == nil { if err == nil {
t.Fatalf("successfully sent oversized envelope (seed %d): false positive.", seed) t.Fatalf("successfully sent oversized envelope (seed %d): false positive.", seed)
} }
w.SetMaxMessageLength(DefaultMaxMessageLength) w.SetMaxMessageSize(DefaultMaxMessageSize)
err = w.Send(env) err = w.Send(env)
if err != nil { if err != nil {
t.Fatalf("failed to send second envelope with seed %d: %s.", seed, err) t.Fatalf("failed to send second envelope with seed %d: %s.", seed, err)