Merge pull request #1145 from filecoin-project/fix/blocksync-spam

limit blocksync response sizes
This commit is contained in:
Jakub Sztandera 2020-01-23 15:21:01 -08:00 committed by GitHub
commit 10159ba1bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 14 deletions

View File

@ -3,6 +3,7 @@ package blocksync
import (
"bufio"
"context"
"time"
"github.com/libp2p/go-libp2p-core/protocol"
"go.opencensus.io/trace"
@ -24,6 +25,8 @@ type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream,
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
const BlockSyncMaxRequestLength = 800
type BlockSyncService struct {
cs *store.ChainStore
}
@ -52,6 +55,15 @@ const (
BSOptMessages
)
const (
StatusOK = uint64(0)
StatusPartial = uint64(101)
StatusNotFound = uint64(201)
StatusGoAway = uint64(202)
StatusInternalError = uint64(203)
StatusBadRequest = uint64(204)
)
type BlockSyncResponse struct {
Chain []*BSTipSet
@ -86,28 +98,30 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
log.Warnf("failed to read block sync request: %s", err)
return
}
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
log.Infow("block sync request", "start", req.Start, "len", req.RequestLength)
resp, err := bss.processRequest(ctx, &req)
resp, err := bss.processRequest(ctx, s.Conn().RemotePeer(), &req)
if err != nil {
log.Warn("failed to process block sync request: ", err)
return
}
writeDeadline := 60 * time.Second
s.SetDeadline(time.Now().Add(writeDeadline))
if err := cborutil.WriteCborRPC(s, resp); err != nil {
log.Warn("failed to write back response for handle stream: ", err)
log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer())
return
}
}
func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) {
func (bss *BlockSyncService) processRequest(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
_, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
defer span.End()
opts := ParseBSOptions(req.Options)
if len(req.Start) == 0 {
return &BlockSyncResponse{
Status: 204,
Status: StatusBadRequest,
Message: "no cids given in blocksync request",
}, nil
}
@ -115,20 +129,32 @@ func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncR
span.AddAttributes(
trace.BoolAttribute("blocks", opts.IncludeBlocks),
trace.BoolAttribute("messages", opts.IncludeMessages),
trace.Int64Attribute("reqlen", int64(req.RequestLength)),
)
chain, err := bss.collectChainSegment(types.NewTipSetKey(req.Start...), req.RequestLength, opts)
reqlen := req.RequestLength
if reqlen > BlockSyncMaxRequestLength {
log.Warnw("limiting blocksync request length", "orig", req.RequestLength, "peer", p)
reqlen = BlockSyncMaxRequestLength
}
chain, err := bss.collectChainSegment(types.NewTipSetKey(req.Start...), reqlen, opts)
if err != nil {
log.Warn("encountered error while responding to block sync request: ", err)
return &BlockSyncResponse{
Status: 203,
Status: StatusInternalError,
Message: err.Error(),
}, nil
}
status := StatusOK
if reqlen < req.RequestLength {
status = StatusPartial
}
return &BlockSyncResponse{
Chain: chain,
Status: 0,
Status: status,
}, nil
}

View File

@ -45,15 +45,15 @@ func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host, pmgr peermg
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
switch res.Status {
case 101: // Partial Response
case StatusPartial: // Partial Response
return xerrors.Errorf("not handling partial blocksync responses yet")
case 201: // req.Start not found
case StatusNotFound: // req.Start not found
return xerrors.Errorf("not found")
case 202: // Go Away
case StatusGoAway: // Go Away
return xerrors.Errorf("not handling 'go away' blocksync responses yet")
case 203: // Internal Error
case StatusInternalError: // Internal Error
return xerrors.Errorf("block sync peer errored: %s", res.Message)
case 204:
case StatusBadRequest:
return xerrors.Errorf("block sync request invalid: %s", res.Message)
default:
return xerrors.Errorf("unrecognized response code: %d", res.Status)
@ -195,11 +195,16 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
continue
}
if res.Status == 0 {
if res.Status == StatusOK {
bs.syncPeers.logGlobalSuccess(time.Since(start))
return res.Chain, nil
}
if res.Status == StatusPartial {
log.Warn("dont yet handle partial responses")
continue
}
err = bs.processStatus(req, res)
if err != nil {
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), err)