whisper/whisperv6: message bundling (#15666)
Changed the communication protocol for ordinary message, according to EIP 627. Messages will be send in bundles, i.e. array of messages will be sent instead of single message.
This commit is contained in:
parent
4b939c23e4
commit
9f1007e554
@ -149,21 +149,26 @@ func (peer *Peer) expire() {
|
|||||||
// broadcast iterates over the collection of envelopes and transmits yet unknown
|
// broadcast iterates over the collection of envelopes and transmits yet unknown
|
||||||
// ones over the network.
|
// ones over the network.
|
||||||
func (p *Peer) broadcast() error {
|
func (p *Peer) broadcast() error {
|
||||||
var cnt int
|
|
||||||
envelopes := p.host.Envelopes()
|
envelopes := p.host.Envelopes()
|
||||||
|
bundle := make([]*Envelope, 0, len(envelopes))
|
||||||
for _, envelope := range envelopes {
|
for _, envelope := range envelopes {
|
||||||
if !p.marked(envelope) {
|
if !p.marked(envelope) {
|
||||||
err := p2p.Send(p.ws, messagesCode, envelope)
|
bundle = append(bundle, envelope)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
} else {
|
|
||||||
p.mark(envelope)
|
|
||||||
cnt++
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cnt > 0 {
|
|
||||||
log.Trace("broadcast", "num. messages", cnt)
|
if len(bundle) > 0 {
|
||||||
|
// transmit the batch of envelopes
|
||||||
|
if err := p2p.Send(p.ws, messagesCode, bundle); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark envelopes only if they were successfully sent
|
||||||
|
for _, e := range bundle {
|
||||||
|
p.mark(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Trace("broadcast", "num. messages", len(bundle))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -515,18 +515,26 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
|||||||
log.Warn("unxepected status message received", "peer", p.peer.ID())
|
log.Warn("unxepected status message received", "peer", p.peer.ID())
|
||||||
case messagesCode:
|
case messagesCode:
|
||||||
// decode the contained envelopes
|
// decode the contained envelopes
|
||||||
var envelope Envelope
|
var envelopes []*Envelope
|
||||||
if err := packet.Decode(&envelope); err != nil {
|
if err := packet.Decode(&envelopes); err != nil {
|
||||||
log.Warn("failed to decode envelope, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
||||||
return errors.New("invalid envelope")
|
return errors.New("invalid envelopes")
|
||||||
}
|
}
|
||||||
cached, err := wh.add(&envelope)
|
|
||||||
if err != nil {
|
trouble := false
|
||||||
log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
for _, env := range envelopes {
|
||||||
return errors.New("invalid envelope")
|
cached, err := wh.add(env)
|
||||||
|
if err != nil {
|
||||||
|
trouble = true
|
||||||
|
log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
||||||
|
}
|
||||||
|
if cached {
|
||||||
|
p.mark(env)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if cached {
|
|
||||||
p.mark(&envelope)
|
if trouble {
|
||||||
|
return errors.New("invalid envelope")
|
||||||
}
|
}
|
||||||
case p2pCode:
|
case p2pCode:
|
||||||
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
|
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
|
||||||
|
Loading…
Reference in New Issue
Block a user