lotus/chain/blocksync/blocksync.go

258 lines
6.0 KiB
Go
Raw Normal View History

2019-11-09 23:00:22 +00:00
package blocksync
2019-07-05 14:29:17 +00:00
import (
"bufio"
"context"
"time"
2019-07-05 14:46:21 +00:00
"github.com/libp2p/go-libp2p-core/protocol"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
2019-07-05 14:29:17 +00:00
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
2019-07-05 14:29:17 +00:00
inet "github.com/libp2p/go-libp2p-core/network"
2019-07-08 12:51:45 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-07-05 14:29:17 +00:00
)
2019-11-09 23:00:22 +00:00
var log = logging.Logger("blocksync")
2019-07-05 14:46:21 +00:00
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
2019-07-05 14:29:17 +00:00
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
const BlockSyncMaxRequestLength = 800
2019-07-05 14:29:17 +00:00
type BlockSyncService struct {
2019-07-26 04:54:22 +00:00
cs *store.ChainStore
2019-07-05 14:29:17 +00:00
}
type BlockSyncRequest struct {
Start []cid.Cid
RequestLength uint64
Options uint64
}
type BSOptions struct {
IncludeBlocks bool
IncludeMessages bool
}
func ParseBSOptions(optfield uint64) *BSOptions {
return &BSOptions{
IncludeBlocks: optfield&(BSOptBlocks) != 0,
IncludeMessages: optfield&(BSOptMessages) != 0,
}
}
const (
BSOptBlocks = 1 << iota
BSOptMessages
2019-07-05 14:29:17 +00:00
)
const (
StatusOK = uint64(0)
StatusPartial = uint64(101)
StatusNotFound = uint64(201)
StatusGoAway = uint64(202)
StatusInternalError = uint64(203)
StatusBadRequest = uint64(204)
)
2019-07-05 14:29:17 +00:00
type BlockSyncResponse struct {
Chain []*BSTipSet
2019-08-22 01:29:19 +00:00
Status uint64
2019-07-05 14:29:17 +00:00
Message string
}
type BSTipSet struct {
Blocks []*types.BlockHeader
2019-07-05 14:29:17 +00:00
BlsMessages []*types.Message
2019-08-22 01:29:19 +00:00
BlsMsgIncludes [][]uint64
SecpkMessages []*types.SignedMessage
2019-08-22 01:29:19 +00:00
SecpkMsgIncludes [][]uint64
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
2019-07-05 14:29:17 +00:00
return &BlockSyncService{
cs: cs,
}
}
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream")
defer span.End()
defer s.Close() //nolint:errcheck
2019-07-05 14:29:17 +00:00
var req BlockSyncRequest
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &req); err != nil {
2019-12-16 17:14:21 +00:00
log.Warnf("failed to read block sync request: %s", err)
2019-07-05 14:29:17 +00:00
return
}
log.Infow("block sync request", "start", req.Start, "len", req.RequestLength)
2019-07-05 14:29:17 +00:00
resp, err := bss.processRequest(ctx, s.Conn().RemotePeer(), &req)
2019-07-05 14:29:17 +00:00
if err != nil {
2019-12-07 22:32:34 +00:00
log.Warn("failed to process block sync request: ", err)
2019-07-05 14:29:17 +00:00
return
}
writeDeadline := 60 * time.Second
_ = s.SetDeadline(time.Now().Add(writeDeadline))
2019-11-07 14:11:39 +00:00
if err := cborutil.WriteCborRPC(s, resp); err != nil {
log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer())
2019-07-05 14:29:17 +00:00
return
}
}
func (bss *BlockSyncService) processRequest(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
_, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
defer span.End()
2019-07-05 14:29:17 +00:00
opts := ParseBSOptions(req.Options)
if len(req.Start) == 0 {
return &BlockSyncResponse{
Status: StatusBadRequest,
2019-08-02 22:32:02 +00:00
Message: "no cids given in blocksync request",
}, nil
}
span.AddAttributes(
trace.BoolAttribute("blocks", opts.IncludeBlocks),
trace.BoolAttribute("messages", opts.IncludeMessages),
trace.Int64Attribute("reqlen", int64(req.RequestLength)),
)
reqlen := req.RequestLength
if reqlen > BlockSyncMaxRequestLength {
log.Warnw("limiting blocksync request length", "orig", req.RequestLength, "peer", p)
reqlen = BlockSyncMaxRequestLength
}
chain, err := collectChainSegment(bss.cs, types.NewTipSetKey(req.Start...), reqlen, opts)
2019-07-05 14:29:17 +00:00
if err != nil {
log.Warn("encountered error while responding to block sync request: ", err)
2019-07-05 14:29:17 +00:00
return &BlockSyncResponse{
Status: StatusInternalError,
2019-11-12 11:45:25 +00:00
Message: err.Error(),
2019-07-05 14:29:17 +00:00
}, nil
}
status := StatusOK
if reqlen < req.RequestLength {
status = StatusPartial
}
2019-07-05 14:29:17 +00:00
return &BlockSyncResponse{
Chain: chain,
Status: status,
2019-07-05 14:29:17 +00:00
}, nil
}
func collectChainSegment(cs *store.ChainStore, start types.TipSetKey, length uint64, opts *BSOptions) ([]*BSTipSet, error) {
2019-07-05 14:29:17 +00:00
var bstips []*BSTipSet
cur := start
for {
var bst BSTipSet
ts, err := cs.LoadTipSet(cur)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, xerrors.Errorf("failed loading tipset %s: %w", cur, err)
2019-07-05 14:29:17 +00:00
}
if opts.IncludeMessages {
bmsgs, bmincl, smsgs, smincl, err := gatherMessages(cs, ts)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, xerrors.Errorf("gather messages failed: %w", err)
2019-07-05 14:29:17 +00:00
}
bst.BlsMessages = bmsgs
bst.BlsMsgIncludes = bmincl
bst.SecpkMessages = smsgs
bst.SecpkMsgIncludes = smincl
2019-07-05 14:29:17 +00:00
}
if opts.IncludeBlocks {
bst.Blocks = ts.Blocks()
}
bstips = append(bstips, &bst)
if uint64(len(bstips)) >= length || ts.Height() == 0 {
return bstips, nil
}
cur = ts.Parents()
}
}
func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
2019-08-22 01:29:19 +00:00
blsmsgmap := make(map[cid.Cid]uint64)
secpkmsgmap := make(map[cid.Cid]uint64)
var secpkmsgs []*types.SignedMessage
var blsmsgs []*types.Message
2019-08-22 01:29:19 +00:00
var secpkincl, blsincl [][]uint64
2019-07-05 14:29:17 +00:00
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := cs.MessagesForBlock(b)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, nil, nil, nil, err
2019-07-05 14:29:17 +00:00
}
2019-08-22 01:29:19 +00:00
bmi := make([]uint64, 0, len(bmsgs))
for _, m := range bmsgs {
i, ok := blsmsgmap[m.Cid()]
2019-07-05 14:29:17 +00:00
if !ok {
2019-08-22 01:29:19 +00:00
i = uint64(len(blsmsgs))
blsmsgs = append(blsmsgs, m)
blsmsgmap[m.Cid()] = i
2019-07-05 14:29:17 +00:00
}
bmi = append(bmi, i)
2019-07-05 14:29:17 +00:00
}
blsincl = append(blsincl, bmi)
2019-08-22 01:29:19 +00:00
smi := make([]uint64, 0, len(smsgs))
for _, m := range smsgs {
i, ok := secpkmsgmap[m.Cid()]
if !ok {
2019-08-22 01:29:19 +00:00
i = uint64(len(secpkmsgs))
secpkmsgs = append(secpkmsgs, m)
secpkmsgmap[m.Cid()] = i
}
smi = append(smi, i)
}
secpkincl = append(secpkincl, smi)
2019-07-05 14:29:17 +00:00
}
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
fts := &store.FullTipSet{}
2019-07-05 14:29:17 +00:00
for i, b := range bts.Blocks {
fb := &types.FullBlock{
2019-07-05 14:29:17 +00:00
Header: b,
}
for _, mi := range bts.BlsMsgIncludes[i] {
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
2019-07-05 14:29:17 +00:00
}
for _, mi := range bts.SecpkMsgIncludes[i] {
fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi])
}
2019-07-05 14:29:17 +00:00
fts.Blocks = append(fts.Blocks, fb)
}
return fts, nil
}