264 lines
6.7 KiB
Go
264 lines
6.7 KiB
Go
package blocksync
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"go.opencensus.io/trace"
|
|
"golang.org/x/xerrors"
|
|
|
|
cborutil "github.com/filecoin-project/go-cbor-util"
|
|
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
inet "github.com/libp2p/go-libp2p-core/network"
|
|
)
|
|
|
|
|
|
// BlockSyncService is the component that services BlockSync requests from
|
|
// peers.
|
|
//
|
|
// BlockSync is the basic chain synchronization protocol of Filecoin. BlockSync
|
|
// is an RPC-oriented protocol, with a single operation to request blocks.
|
|
//
|
|
// A request contains a start anchor block (referred to with a CID), and a
|
|
// amount of blocks requested beyond the anchor (including the anchor itself).
|
|
//
|
|
// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports
|
|
// two options at the moment:
|
|
//
|
|
// - include block contents
|
|
// - include block messages
|
|
//
|
|
// The response will include a status code, an optional message, and the
|
|
// response payload in case of success. The payload is a slice of serialized
|
|
// tipsets.
|
|
// FIXME: Rename to just `Server` (will be done later, see note on `BlockSync`).
|
|
type BlockSyncService struct {
|
|
cs *store.ChainStore
|
|
}
|
|
|
|
func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
|
|
return &BlockSyncService{
|
|
cs: cs,
|
|
}
|
|
}
|
|
|
|
// Entry point of the service, handles `Request`s.
|
|
func (server *BlockSyncService) HandleStream(stream inet.Stream) {
|
|
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream")
|
|
defer span.End()
|
|
|
|
defer stream.Close() //nolint:errcheck
|
|
|
|
var req Request
|
|
if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil {
|
|
log.Warnf("failed to read block sync request: %s", err)
|
|
return
|
|
}
|
|
log.Infow("block sync request",
|
|
"start", req.Head, "len", req.Length)
|
|
|
|
resp, err := server.processRequest(ctx, &req)
|
|
if err != nil {
|
|
log.Warn("failed to process request: ", err)
|
|
return
|
|
}
|
|
|
|
_ = stream.SetDeadline(time.Now().Add(WRITE_RES_DEADLINE))
|
|
if err := cborutil.WriteCborRPC(stream, resp); err != nil {
|
|
log.Warnw("failed to write back response for handle stream",
|
|
"err", err, "peer", stream.Conn().RemotePeer())
|
|
return
|
|
}
|
|
}
|
|
|
|
// Validate and service the request. We return either a protocol
|
|
// response or an internal error. The protocol response may signal
|
|
// a protocol error itself (e.g., invalid request).
|
|
func (server *BlockSyncService) processRequest(
|
|
ctx context.Context,
|
|
req *Request,
|
|
) (*Response, error) {
|
|
validReq, errResponse := validateRequest(ctx, req)
|
|
if errResponse != nil {
|
|
// The request did not pass validation, return the response
|
|
// indicating it.
|
|
return errResponse, nil
|
|
}
|
|
|
|
return server.serviceRequest(ctx, validReq)
|
|
}
|
|
|
|
// Validate request. We either return a `validatedRequest`, or an error
|
|
// `Response` indicating why we can't process it. We do not return any
|
|
// internal errors here, we just signal protocol ones.
|
|
func validateRequest(
|
|
ctx context.Context,
|
|
req *Request,
|
|
) ( *validatedRequest, *Response) {
|
|
_, span := trace.StartSpan(ctx, "blocksync.ValidateRequest")
|
|
defer span.End()
|
|
|
|
validReq := validatedRequest{}
|
|
|
|
validReq.options = parseOptions(req.Options)
|
|
if validReq.options.noOptionsSet() {
|
|
return nil, &Response{
|
|
Status: BadRequest,
|
|
ErrorMessage: "no options set",
|
|
}
|
|
}
|
|
|
|
validReq.length = req.Length
|
|
if validReq.length > MaxRequestLength {
|
|
return nil, &Response{
|
|
Status: BadRequest,
|
|
ErrorMessage: fmt.Sprintf("request length over maximum allowed (%d)",
|
|
MaxRequestLength),
|
|
}
|
|
}
|
|
if validReq.length == 0 {
|
|
return nil, &Response{
|
|
Status: BadRequest,
|
|
ErrorMessage: "invalid request length of zero",
|
|
}
|
|
}
|
|
|
|
if len(req.Head) == 0 {
|
|
return nil, &Response{
|
|
Status: BadRequest,
|
|
ErrorMessage: "no cids in request",
|
|
}
|
|
}
|
|
validReq.head = types.NewTipSetKey(req.Head...)
|
|
|
|
// FIXME: Add as a defer at the start.
|
|
span.AddAttributes(
|
|
trace.BoolAttribute("blocks", validReq.options.IncludeHeaders),
|
|
trace.BoolAttribute("messages", validReq.options.IncludeMessages),
|
|
trace.Int64Attribute("reqlen", int64(validReq.length)),
|
|
)
|
|
|
|
return &validReq, nil
|
|
}
|
|
|
|
func (server *BlockSyncService) serviceRequest(
|
|
ctx context.Context,
|
|
req *validatedRequest,
|
|
) (*Response, error) {
|
|
_, span := trace.StartSpan(ctx, "blocksync.ServiceRequest")
|
|
defer span.End()
|
|
|
|
chain, err := collectChainSegment(server.cs, req)
|
|
if err != nil {
|
|
log.Warn("block sync request: collectChainSegment failed: ", err)
|
|
return &Response{
|
|
Status: InternalError,
|
|
ErrorMessage: err.Error(),
|
|
}, nil
|
|
}
|
|
|
|
status := Ok
|
|
if len(chain) < int(req.length) {
|
|
status = Partial
|
|
}
|
|
|
|
return &Response{
|
|
Chain: chain,
|
|
Status: status,
|
|
}, nil
|
|
}
|
|
|
|
func collectChainSegment(
|
|
cs *store.ChainStore,
|
|
req *validatedRequest,
|
|
) ([]*BSTipSet, error) {
|
|
var bstips []*BSTipSet
|
|
|
|
cur := req.head
|
|
for {
|
|
var bst BSTipSet
|
|
ts, err := cs.LoadTipSet(cur)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("failed loading tipset %s: %w", cur, err)
|
|
}
|
|
|
|
if req.options.IncludeHeaders {
|
|
bst.Blocks = ts.Blocks()
|
|
}
|
|
|
|
if req.options.IncludeMessages {
|
|
bmsgs, bmincl, smsgs, smincl, err := gatherMessages(cs, ts)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("gather messages failed: %w", err)
|
|
}
|
|
|
|
// FIXME: Pass the response to the `gatherMessages` and set all this there.
|
|
bst.Messages = &CompactedMessages{}
|
|
bst.Messages.Bls = bmsgs
|
|
bst.Messages.BlsIncludes = bmincl
|
|
bst.Messages.Secpk = smsgs
|
|
bst.Messages.SecpkIncludes = smincl
|
|
}
|
|
|
|
bstips = append(bstips, &bst)
|
|
|
|
// If we collected the length requested or if we reached the
|
|
// start (genesis), then stop.
|
|
if uint64(len(bstips)) >= req.length || ts.Height() == 0 {
|
|
return bstips, nil
|
|
}
|
|
|
|
cur = ts.Parents()
|
|
}
|
|
}
|
|
|
|
func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
|
|
blsmsgmap := make(map[cid.Cid]uint64)
|
|
secpkmsgmap := make(map[cid.Cid]uint64)
|
|
var secpkmsgs []*types.SignedMessage
|
|
var blsmsgs []*types.Message
|
|
var secpkincl, blsincl [][]uint64
|
|
|
|
for _, block := range ts.Blocks() {
|
|
bmsgs, smsgs, err := cs.MessagesForBlock(block)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, err
|
|
}
|
|
|
|
// FIXME: DRY. Use `chain.Message` interface.
|
|
bmi := make([]uint64, 0, len(bmsgs))
|
|
for _, m := range bmsgs {
|
|
i, ok := blsmsgmap[m.Cid()]
|
|
if !ok {
|
|
i = uint64(len(blsmsgs))
|
|
blsmsgs = append(blsmsgs, m)
|
|
blsmsgmap[m.Cid()] = i
|
|
}
|
|
|
|
bmi = append(bmi, i)
|
|
}
|
|
blsincl = append(blsincl, bmi)
|
|
|
|
smi := make([]uint64, 0, len(smsgs))
|
|
for _, m := range smsgs {
|
|
i, ok := secpkmsgmap[m.Cid()]
|
|
if !ok {
|
|
i = uint64(len(secpkmsgs))
|
|
secpkmsgs = append(secpkmsgs, m)
|
|
secpkmsgmap[m.Cid()] = i
|
|
}
|
|
|
|
smi = append(smi, i)
|
|
}
|
|
secpkincl = append(secpkincl, smi)
|
|
}
|
|
|
|
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
|
|
}
|