swarm/network/stream: disambiguate chunk delivery messages (retrieval… (#17920)
* swarm/network/stream: disambiguate chunk delivery messages (retrieval vs syncing) * swarm/network/stream: addressed PR comments * swarm/network/stream: stream protocol version change due to new message types in this PR
This commit is contained in:
parent
66debd91d9
commit
88b41a9e68
@ -173,7 +173,8 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
|
||||
return
|
||||
}
|
||||
if req.SkipCheck {
|
||||
err = sp.Deliver(ctx, chunk, s.priority)
|
||||
syncing := false
|
||||
err = sp.Deliver(ctx, chunk, s.priority, syncing)
|
||||
if err != nil {
|
||||
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
|
||||
}
|
||||
@ -189,12 +190,22 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
|
||||
return nil
|
||||
}
|
||||
|
||||
//Chunk delivery always uses the same message type....
|
||||
type ChunkDeliveryMsg struct {
|
||||
Addr storage.Address
|
||||
SData []byte // the stored chunk Data (incl size)
|
||||
peer *Peer // set in handleChunkDeliveryMsg
|
||||
}
|
||||
|
||||
//...but swap accounting needs to disambiguate if it is a delivery for syncing or for retrieval
|
||||
//as it decides based on message type if it needs to account for this message or not
|
||||
|
||||
//defines a chunk delivery for retrieval (with accounting)
|
||||
type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
|
||||
|
||||
//defines a chunk delivery for syncing (without accounting)
|
||||
type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
|
||||
|
||||
// TODO: Fix context SNAFU
|
||||
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
|
||||
var osp opentracing.Span
|
||||
|
@ -357,7 +357,8 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
|
||||
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
|
||||
}
|
||||
chunk := storage.NewChunk(hash, data)
|
||||
if err := p.Deliver(ctx, chunk, s.priority); err != nil {
|
||||
syncing := true
|
||||
if err := p.Deliver(ctx, chunk, s.priority, syncing); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -128,17 +128,34 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
|
||||
}
|
||||
|
||||
// Deliver sends a storeRequestMsg protocol message to the peer
|
||||
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error {
|
||||
// Depending on the `syncing` parameter we send different message types
|
||||
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
|
||||
var sp opentracing.Span
|
||||
var msg interface{}
|
||||
|
||||
spanName := "send.chunk.delivery"
|
||||
|
||||
//we send different types of messages if delivery is for syncing or retrievals,
|
||||
//even if handling and content of the message are the same,
|
||||
//because swap accounting decides which messages need accounting based on the message type
|
||||
if syncing {
|
||||
msg = &ChunkDeliveryMsgSyncing{
|
||||
Addr: chunk.Address(),
|
||||
SData: chunk.Data(),
|
||||
}
|
||||
spanName += ".syncing"
|
||||
} else {
|
||||
msg = &ChunkDeliveryMsgRetrieval{
|
||||
Addr: chunk.Address(),
|
||||
SData: chunk.Data(),
|
||||
}
|
||||
spanName += ".retrieval"
|
||||
}
|
||||
ctx, sp = spancontext.StartSpan(
|
||||
ctx,
|
||||
"send.chunk.delivery")
|
||||
spanName)
|
||||
defer sp.Finish()
|
||||
|
||||
msg := &ChunkDeliveryMsg{
|
||||
Addr: chunk.Address(),
|
||||
SData: chunk.Data(),
|
||||
}
|
||||
return p.SendPriority(ctx, msg, priority)
|
||||
}
|
||||
|
||||
|
@ -489,8 +489,13 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
|
||||
case *WantedHashesMsg:
|
||||
return p.handleWantedHashesMsg(ctx, msg)
|
||||
|
||||
case *ChunkDeliveryMsg:
|
||||
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg)
|
||||
case *ChunkDeliveryMsgRetrieval:
|
||||
//handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
|
||||
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
|
||||
|
||||
case *ChunkDeliveryMsgSyncing:
|
||||
//handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
|
||||
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
|
||||
|
||||
case *RetrieveRequestMsg:
|
||||
return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
|
||||
@ -681,7 +686,7 @@ func (c *clientParams) clientCreated() {
|
||||
// Spec is the spec of the streamer protocol
|
||||
var Spec = &protocols.Spec{
|
||||
Name: "stream",
|
||||
Version: 7,
|
||||
Version: 8,
|
||||
MaxMsgSize: 10 * 1024 * 1024,
|
||||
Messages: []interface{}{
|
||||
UnsubscribeMsg{},
|
||||
@ -690,10 +695,11 @@ var Spec = &protocols.Spec{
|
||||
TakeoverProofMsg{},
|
||||
SubscribeMsg{},
|
||||
RetrieveRequestMsg{},
|
||||
ChunkDeliveryMsg{},
|
||||
ChunkDeliveryMsgRetrieval{},
|
||||
SubscribeErrorMsg{},
|
||||
RequestSubscriptionMsg{},
|
||||
QuitMsg{},
|
||||
ChunkDeliveryMsgSyncing{},
|
||||
},
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user