limit blocksync response sizes
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
7f1b12d29e
commit
792a2cad82
@ -3,6 +3,7 @@ package blocksync
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p-core/protocol"
|
"github.com/libp2p/go-libp2p-core/protocol"
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
@ -24,6 +25,8 @@ type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream,
|
|||||||
|
|
||||||
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
|
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
|
||||||
|
|
||||||
|
const BlockSyncMaxRequestLength = 800
|
||||||
|
|
||||||
type BlockSyncService struct {
|
type BlockSyncService struct {
|
||||||
cs *store.ChainStore
|
cs *store.ChainStore
|
||||||
}
|
}
|
||||||
@ -52,6 +55,15 @@ const (
|
|||||||
BSOptMessages
|
BSOptMessages
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
StatusOK = uint64(0)
|
||||||
|
StatusPartial = uint64(101)
|
||||||
|
StatusNotFound = uint64(201)
|
||||||
|
StatusGoAway = uint64(202)
|
||||||
|
StatusInternalError = uint64(203)
|
||||||
|
StatusBadRequest = uint64(204)
|
||||||
|
)
|
||||||
|
|
||||||
type BlockSyncResponse struct {
|
type BlockSyncResponse struct {
|
||||||
Chain []*BSTipSet
|
Chain []*BSTipSet
|
||||||
|
|
||||||
@ -86,28 +98,30 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
|
|||||||
log.Warnf("failed to read block sync request: %s", err)
|
log.Warnf("failed to read block sync request: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
|
log.Infow("block sync request", "start", req.Start, "len", req.RequestLength)
|
||||||
|
|
||||||
resp, err := bss.processRequest(ctx, &req)
|
resp, err := bss.processRequest(ctx, s.Conn().RemotePeer(), &req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("failed to process block sync request: ", err)
|
log.Warn("failed to process block sync request: ", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writeDeadline := 60 * time.Second
|
||||||
|
s.SetDeadline(time.Now().Add(writeDeadline))
|
||||||
if err := cborutil.WriteCborRPC(s, resp); err != nil {
|
if err := cborutil.WriteCborRPC(s, resp); err != nil {
|
||||||
log.Warn("failed to write back response for handle stream: ", err)
|
log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
func (bss *BlockSyncService) processRequest(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
||||||
_, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
|
_, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
opts := ParseBSOptions(req.Options)
|
opts := ParseBSOptions(req.Options)
|
||||||
if len(req.Start) == 0 {
|
if len(req.Start) == 0 {
|
||||||
return &BlockSyncResponse{
|
return &BlockSyncResponse{
|
||||||
Status: 204,
|
Status: StatusBadRequest,
|
||||||
Message: "no cids given in blocksync request",
|
Message: "no cids given in blocksync request",
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -115,20 +129,32 @@ func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncR
|
|||||||
span.AddAttributes(
|
span.AddAttributes(
|
||||||
trace.BoolAttribute("blocks", opts.IncludeBlocks),
|
trace.BoolAttribute("blocks", opts.IncludeBlocks),
|
||||||
trace.BoolAttribute("messages", opts.IncludeMessages),
|
trace.BoolAttribute("messages", opts.IncludeMessages),
|
||||||
|
trace.Int64Attribute("reqlen", int64(req.RequestLength)),
|
||||||
)
|
)
|
||||||
|
|
||||||
chain, err := bss.collectChainSegment(types.NewTipSetKey(req.Start...), req.RequestLength, opts)
|
reqlen := req.RequestLength
|
||||||
|
if reqlen > BlockSyncMaxRequestLength {
|
||||||
|
log.Warnw("limiting blocksync request length", "orig", req.RequestLength, "peer", p)
|
||||||
|
reqlen = BlockSyncMaxRequestLength
|
||||||
|
}
|
||||||
|
|
||||||
|
chain, err := bss.collectChainSegment(types.NewTipSetKey(req.Start...), reqlen, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("encountered error while responding to block sync request: ", err)
|
log.Warn("encountered error while responding to block sync request: ", err)
|
||||||
return &BlockSyncResponse{
|
return &BlockSyncResponse{
|
||||||
Status: 203,
|
Status: StatusInternalError,
|
||||||
Message: err.Error(),
|
Message: err.Error(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
status := StatusOK
|
||||||
|
if reqlen < req.RequestLength {
|
||||||
|
status = StatusPartial
|
||||||
|
}
|
||||||
|
|
||||||
return &BlockSyncResponse{
|
return &BlockSyncResponse{
|
||||||
Chain: chain,
|
Chain: chain,
|
||||||
Status: 0,
|
Status: status,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,15 +45,15 @@ func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host, pmgr peermg
|
|||||||
|
|
||||||
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
|
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
|
||||||
switch res.Status {
|
switch res.Status {
|
||||||
case 101: // Partial Response
|
case StatusPartial: // Partial Response
|
||||||
return xerrors.Errorf("not handling partial blocksync responses yet")
|
return xerrors.Errorf("not handling partial blocksync responses yet")
|
||||||
case 201: // req.Start not found
|
case StatusNotFound: // req.Start not found
|
||||||
return xerrors.Errorf("not found")
|
return xerrors.Errorf("not found")
|
||||||
case 202: // Go Away
|
case StatusGoAway: // Go Away
|
||||||
return xerrors.Errorf("not handling 'go away' blocksync responses yet")
|
return xerrors.Errorf("not handling 'go away' blocksync responses yet")
|
||||||
case 203: // Internal Error
|
case StatusInternalError: // Internal Error
|
||||||
return xerrors.Errorf("block sync peer errored: %s", res.Message)
|
return xerrors.Errorf("block sync peer errored: %s", res.Message)
|
||||||
case 204:
|
case StatusBadRequest:
|
||||||
return xerrors.Errorf("block sync request invalid: %s", res.Message)
|
return xerrors.Errorf("block sync request invalid: %s", res.Message)
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("unrecognized response code: %d", res.Status)
|
return xerrors.Errorf("unrecognized response code: %d", res.Status)
|
||||||
@ -195,11 +195,16 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.Status == 0 {
|
if res.Status == StatusOK {
|
||||||
bs.syncPeers.logGlobalSuccess(time.Since(start))
|
bs.syncPeers.logGlobalSuccess(time.Since(start))
|
||||||
return res.Chain, nil
|
return res.Chain, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if res.Status == StatusPartial {
|
||||||
|
log.Warn("dont yet handle partial responses")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
err = bs.processStatus(req, res)
|
err = bs.processStatus(req, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), err)
|
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), err)
|
||||||
|
Loading…
Reference in New Issue
Block a user