Merge pull request #3939 from filecoin-project/fix/chain-sync-validation

Validate chain sync response indices when fetching messages
This commit is contained in:
Aayush Rajasekaran 2020-09-23 17:35:14 -04:00 committed by GitHub
commit e09d291e5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 37 deletions

View File

@ -65,7 +65,15 @@ func NewClient(lc fx.Lifecycle, host host.Host, pmgr peermgr.MaybePeerMgr) Clien
// request options without disrupting external calls. In the future the // request options without disrupting external calls. In the future the
// consumers should be forced to use a more standardized service and // consumers should be forced to use a more standardized service and
// adhere to a single API derived from this function. // adhere to a single API derived from this function.
func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.ID) (*validatedResponse, error) { func (c *client) doRequest(
ctx context.Context,
req *Request,
singlePeer *peer.ID,
// In the `GetChainMessages` case, we won't request the headers but we still
// need them to check the integrity of the `CompactedMessages` in the response
// so the tipset blocks need to be provided by the caller.
tipsets []*types.TipSet,
) (*validatedResponse, error) {
// Validate request. // Validate request.
if req.Length == 0 { if req.Length == 0 {
return nil, xerrors.Errorf("invalid request of length 0") return nil, xerrors.Errorf("invalid request of length 0")
@ -116,7 +124,7 @@ func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.I
} }
// Process and validate response. // Process and validate response.
validRes, err := c.processResponse(req, res) validRes, err := c.processResponse(req, res, tipsets)
if err != nil { if err != nil {
log.Warnf("processing peer %s response failed: %s", log.Warnf("processing peer %s response failed: %s",
peer.String(), err) peer.String(), err)
@ -144,7 +152,7 @@ func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.I
// errors. Peer penalization should happen here then, before returning, so // errors. Peer penalization should happen here then, before returning, so
// we can apply the correct penalties depending on the cause of the error. // we can apply the correct penalties depending on the cause of the error.
// FIXME: Add the `peer` as argument once we implement penalties. // FIXME: Add the `peer` as argument once we implement penalties.
func (c *client) processResponse(req *Request, res *Response) (*validatedResponse, error) { func (c *client) processResponse(req *Request, res *Response, tipsets []*types.TipSet) (*validatedResponse, error) {
err := res.statusToError() err := res.statusToError()
if err != nil { if err != nil {
return nil, xerrors.Errorf("status error: %s", err) return nil, xerrors.Errorf("status error: %s", err)
@ -176,6 +184,16 @@ func (c *client) processResponse(req *Request, res *Response) (*validatedRespons
// Check for valid block sets and extract them into `TipSet`s. // Check for valid block sets and extract them into `TipSet`s.
validRes.tipsets = make([]*types.TipSet, resLength) validRes.tipsets = make([]*types.TipSet, resLength)
for i := 0; i < resLength; i++ { for i := 0; i < resLength; i++ {
if res.Chain[i] == nil {
return nil, xerrors.Errorf("response with nil tipset in pos %d", i)
}
for blockIdx, block := range res.Chain[i].Blocks {
if block == nil {
return nil, xerrors.Errorf("tipset with nil block in pos %d", blockIdx)
// FIXME: Maybe we should move this check to `NewTipSet`.
}
}
validRes.tipsets[i], err = types.NewTipSet(res.Chain[i].Blocks) validRes.tipsets[i], err = types.NewTipSet(res.Chain[i].Blocks)
if err != nil { if err != nil {
return nil, xerrors.Errorf("invalid tipset blocks at height (head - %d): %w", i, err) return nil, xerrors.Errorf("invalid tipset blocks at height (head - %d): %w", i, err)
@ -210,31 +228,28 @@ func (c *client) processResponse(req *Request, res *Response) (*validatedRespons
// If the headers were also returned check that the compression // If the headers were also returned check that the compression
// indexes are valid before `toFullTipSets()` is called by the // indexes are valid before `toFullTipSets()` is called by the
// consumer. // consumer.
for tipsetIdx := 0; tipsetIdx < resLength; tipsetIdx++ { err := c.validateCompressedIndices(res.Chain)
msgs := res.Chain[tipsetIdx].Messages if err != nil {
blocksNum := len(res.Chain[tipsetIdx].Blocks) return nil, err
if len(msgs.BlsIncludes) != blocksNum {
return nil, xerrors.Errorf("BlsIncludes (%d) does not match number of blocks (%d)",
len(msgs.BlsIncludes), blocksNum)
} }
if len(msgs.SecpkIncludes) != blocksNum { } else {
return nil, xerrors.Errorf("SecpkIncludes (%d) does not match number of blocks (%d)", // If we didn't request the headers they should have been provided
len(msgs.SecpkIncludes), blocksNum) // by the caller.
} if len(tipsets) < len(res.Chain) {
for blockIdx := 0; blockIdx < blocksNum; blockIdx++ { return nil, xerrors.Errorf("not enought tipsets provided for message response validation, needed %d, have %d", len(res.Chain), len(tipsets))
for _, mi := range msgs.BlsIncludes[blockIdx] {
if int(mi) >= len(msgs.Bls) {
return nil, xerrors.Errorf("index in BlsIncludes (%d) exceeds number of messages (%d)",
mi, len(msgs.Bls))
}
}
for _, mi := range msgs.SecpkIncludes[blockIdx] {
if int(mi) >= len(msgs.Secpk) {
return nil, xerrors.Errorf("index in SecpkIncludes (%d) exceeds number of messages (%d)",
mi, len(msgs.Secpk))
} }
chain := make([]*BSTipSet, 0, resLength)
for i, resChain := range res.Chain {
next := &BSTipSet{
Blocks: tipsets[i].Blocks(),
Messages: resChain.Messages,
} }
chain = append(chain, next)
} }
err := c.validateCompressedIndices(chain)
if err != nil {
return nil, err
} }
} }
} }
@ -242,6 +257,42 @@ func (c *client) processResponse(req *Request, res *Response) (*validatedRespons
return validRes, nil return validRes, nil
} }
func (c *client) validateCompressedIndices(chain []*BSTipSet) error {
resLength := len(chain)
for tipsetIdx := 0; tipsetIdx < resLength; tipsetIdx++ {
msgs := chain[tipsetIdx].Messages
blocksNum := len(chain[tipsetIdx].Blocks)
if len(msgs.BlsIncludes) != blocksNum {
return xerrors.Errorf("BlsIncludes (%d) does not match number of blocks (%d)",
len(msgs.BlsIncludes), blocksNum)
}
if len(msgs.SecpkIncludes) != blocksNum {
return xerrors.Errorf("SecpkIncludes (%d) does not match number of blocks (%d)",
len(msgs.SecpkIncludes), blocksNum)
}
for blockIdx := 0; blockIdx < blocksNum; blockIdx++ {
for _, mi := range msgs.BlsIncludes[blockIdx] {
if int(mi) >= len(msgs.Bls) {
return xerrors.Errorf("index in BlsIncludes (%d) exceeds number of messages (%d)",
mi, len(msgs.Bls))
}
}
for _, mi := range msgs.SecpkIncludes[blockIdx] {
if int(mi) >= len(msgs.Secpk) {
return xerrors.Errorf("index in SecpkIncludes (%d) exceeds number of messages (%d)",
mi, len(msgs.Secpk))
}
}
}
}
return nil
}
// GetBlocks implements Client.GetBlocks(). Refer to the godocs there. // GetBlocks implements Client.GetBlocks(). Refer to the godocs there.
func (c *client) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) { func (c *client) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks") ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
@ -259,7 +310,7 @@ func (c *client) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int)
Options: Headers, Options: Headers,
} }
validRes, err := c.doRequest(ctx, req, nil) validRes, err := c.doRequest(ctx, req, nil, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -277,7 +328,7 @@ func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipS
Options: Headers | Messages, Options: Headers | Messages,
} }
validRes, err := c.doRequest(ctx, req, &peer) validRes, err := c.doRequest(ctx, req, &peer, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -288,7 +339,10 @@ func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipS
} }
// GetChainMessages implements Client.GetChainMessages(). Refer to the godocs there. // GetChainMessages implements Client.GetChainMessages(). Refer to the godocs there.
func (c *client) GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error) { func (c *client) GetChainMessages(ctx context.Context, tipsets []*types.TipSet) ([]*CompactedMessages, error) {
head := tipsets[0]
length := uint64(len(tipsets))
ctx, span := trace.StartSpan(ctx, "GetChainMessages") ctx, span := trace.StartSpan(ctx, "GetChainMessages")
if span.IsRecordingEvents() { if span.IsRecordingEvents() {
span.AddAttributes( span.AddAttributes(
@ -304,7 +358,7 @@ func (c *client) GetChainMessages(ctx context.Context, head *types.TipSet, lengt
Options: Messages, Options: Messages,
} }
validRes, err := c.doRequest(ctx, req, nil) validRes, err := c.doRequest(ctx, req, nil, tipsets)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -32,10 +32,9 @@ type Client interface {
// or less. // or less.
GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error)
// GetChainMessages fetches messages from the network, from the provided // GetChainMessages fetches messages from the network, starting from the first provided tipset
// tipset *backwards*, returning the messages from as many tipsets as the // and returning messages from as many tipsets as requested or less.
// count parameter, or less. GetChainMessages(ctx context.Context, tipsets []*types.TipSet) ([]*CompactedMessages, error)
GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error)
// GetFullTipSet fetches a full tipset from a given peer. If successful, // GetFullTipSet fetches a full tipset from a given peer. If successful,
// the fetched object contains block headers and all messages in full form. // the fetched object contains block headers and all messages in full form.

View File

@ -139,6 +139,8 @@ func (res *Response) statusToError() error {
// FIXME: Rename. // FIXME: Rename.
type BSTipSet struct { type BSTipSet struct {
// List of blocks belonging to a single tipset to which the
// `CompactedMessages` are linked.
Blocks []*types.BlockHeader Blocks []*types.BlockHeader
Messages *CompactedMessages Messages *CompactedMessages
} }

View File

@ -1553,7 +1553,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet
failed := false failed := false
for offset := 0; !failed && offset < nreq; { for offset := 0; !failed && offset < nreq; {
nextI := j + offset nextI := j + offset
nextHeader := headers[nextI] lastI := j + nreq
var requestErr error var requestErr error
var requestResult []*exchange.CompactedMessages var requestResult []*exchange.CompactedMessages
@ -1564,7 +1564,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet
log.Infof("fetching messages at %d", startOffset+nextI) log.Infof("fetching messages at %d", startOffset+nextI)
} }
result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) result, err := syncer.Exchange.GetChainMessages(ctx, headers[nextI:lastI])
if err != nil { if err != nil {
requestErr = multierror.Append(requestErr, err) requestErr = multierror.Append(requestErr, err)
} else { } else {