lotus/chain/blocksync/blocksync.go

239 lines
5.4 KiB
Go
Raw Normal View History

2019-11-09 23:00:22 +00:00
package blocksync
2019-07-05 14:29:17 +00:00
import (
"bufio"
"context"
2019-07-05 14:46:21 +00:00
"github.com/libp2p/go-libp2p-core/protocol"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
2020-01-07 14:00:10 +00:00
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
2019-07-05 14:29:17 +00:00
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
2019-07-05 14:29:17 +00:00
inet "github.com/libp2p/go-libp2p-core/network"
2019-07-08 12:51:45 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-07-05 14:29:17 +00:00
)
2019-11-09 23:00:22 +00:00
var log = logging.Logger("blocksync")
2019-07-05 14:46:21 +00:00
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
2019-07-05 14:29:17 +00:00
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
func init() {
cbor.RegisterCborType(BlockSyncRequest{})
cbor.RegisterCborType(BlockSyncResponse{})
cbor.RegisterCborType(BSTipSet{})
}
type BlockSyncService struct {
2019-07-26 04:54:22 +00:00
cs *store.ChainStore
2019-07-05 14:29:17 +00:00
}
type BlockSyncRequest struct {
Start []cid.Cid
RequestLength uint64
Options uint64
}
type BSOptions struct {
IncludeBlocks bool
IncludeMessages bool
}
func ParseBSOptions(optfield uint64) *BSOptions {
return &BSOptions{
IncludeBlocks: optfield&(BSOptBlocks) != 0,
IncludeMessages: optfield&(BSOptMessages) != 0,
}
}
const (
BSOptBlocks = 1 << 0
BSOptMessages = 1 << 1
)
type BlockSyncResponse struct {
Chain []*BSTipSet
2019-08-22 01:29:19 +00:00
Status uint64
2019-07-05 14:29:17 +00:00
Message string
}
type BSTipSet struct {
Blocks []*types.BlockHeader
2019-07-05 14:29:17 +00:00
BlsMessages []*types.Message
2019-08-22 01:29:19 +00:00
BlsMsgIncludes [][]uint64
SecpkMessages []*types.SignedMessage
2019-08-22 01:29:19 +00:00
SecpkMsgIncludes [][]uint64
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
2019-07-05 14:29:17 +00:00
return &BlockSyncService{
cs: cs,
}
}
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream")
defer span.End()
2019-07-05 14:29:17 +00:00
defer s.Close()
var req BlockSyncRequest
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &req); err != nil {
2019-12-16 17:14:21 +00:00
log.Warnf("failed to read block sync request: %s", err)
2019-07-05 14:29:17 +00:00
return
}
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
2019-07-05 14:29:17 +00:00
resp, err := bss.processRequest(ctx, &req)
2019-07-05 14:29:17 +00:00
if err != nil {
2019-12-07 22:32:34 +00:00
log.Warn("failed to process block sync request: ", err)
2019-07-05 14:29:17 +00:00
return
}
2019-11-07 14:11:39 +00:00
if err := cborutil.WriteCborRPC(s, resp); err != nil {
2019-12-16 17:14:21 +00:00
log.Warn("failed to write back response for handle stream: ", err)
2019-07-05 14:29:17 +00:00
return
}
}
func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) {
_, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
defer span.End()
2019-07-05 14:29:17 +00:00
opts := ParseBSOptions(req.Options)
if len(req.Start) == 0 {
return &BlockSyncResponse{
2019-08-02 22:32:02 +00:00
Status: 204,
Message: "no cids given in blocksync request",
}, nil
}
span.AddAttributes(
trace.BoolAttribute("blocks", opts.IncludeBlocks),
trace.BoolAttribute("messages", opts.IncludeMessages),
)
2019-12-16 19:22:56 +00:00
chain, err := bss.collectChainSegment(types.NewTipSetKey(req.Start...), req.RequestLength, opts)
2019-07-05 14:29:17 +00:00
if err != nil {
log.Warn("encountered error while responding to block sync request: ", err)
2019-07-05 14:29:17 +00:00
return &BlockSyncResponse{
Status: 203,
2019-11-12 11:45:25 +00:00
Message: err.Error(),
2019-07-05 14:29:17 +00:00
}, nil
}
return &BlockSyncResponse{
Chain: chain,
Status: 0,
}, nil
}
2019-12-16 19:22:56 +00:00
func (bss *BlockSyncService) collectChainSegment(start types.TipSetKey, length uint64, opts *BSOptions) ([]*BSTipSet, error) {
2019-07-05 14:29:17 +00:00
var bstips []*BSTipSet
cur := start
for {
var bst BSTipSet
ts, err := bss.cs.LoadTipSet(cur)
if err != nil {
return nil, xerrors.Errorf("failed loading tipset %s: %w", cur, err)
2019-07-05 14:29:17 +00:00
}
if opts.IncludeMessages {
bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, xerrors.Errorf("gather messages failed: %w", err)
2019-07-05 14:29:17 +00:00
}
bst.BlsMessages = bmsgs
bst.BlsMsgIncludes = bmincl
bst.SecpkMessages = smsgs
bst.SecpkMsgIncludes = smincl
2019-07-05 14:29:17 +00:00
}
if opts.IncludeBlocks {
bst.Blocks = ts.Blocks()
}
bstips = append(bstips, &bst)
if uint64(len(bstips)) >= length || ts.Height() == 0 {
return bstips, nil
}
cur = ts.Parents()
}
}
2019-08-22 01:29:19 +00:00
func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
blsmsgmap := make(map[cid.Cid]uint64)
secpkmsgmap := make(map[cid.Cid]uint64)
var secpkmsgs []*types.SignedMessage
var blsmsgs []*types.Message
2019-08-22 01:29:19 +00:00
var secpkincl, blsincl [][]uint64
2019-07-05 14:29:17 +00:00
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := bss.cs.MessagesForBlock(b)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, nil, nil, nil, err
2019-07-05 14:29:17 +00:00
}
2019-08-22 01:29:19 +00:00
bmi := make([]uint64, 0, len(bmsgs))
for _, m := range bmsgs {
i, ok := blsmsgmap[m.Cid()]
2019-07-05 14:29:17 +00:00
if !ok {
2019-08-22 01:29:19 +00:00
i = uint64(len(blsmsgs))
blsmsgs = append(blsmsgs, m)
blsmsgmap[m.Cid()] = i
2019-07-05 14:29:17 +00:00
}
bmi = append(bmi, i)
2019-07-05 14:29:17 +00:00
}
blsincl = append(blsincl, bmi)
2019-08-22 01:29:19 +00:00
smi := make([]uint64, 0, len(smsgs))
for _, m := range smsgs {
i, ok := secpkmsgmap[m.Cid()]
if !ok {
2019-08-22 01:29:19 +00:00
i = uint64(len(secpkmsgs))
secpkmsgs = append(secpkmsgs, m)
secpkmsgmap[m.Cid()] = i
}
smi = append(smi, i)
}
secpkincl = append(secpkincl, smi)
2019-07-05 14:29:17 +00:00
}
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
fts := &store.FullTipSet{}
2019-07-05 14:29:17 +00:00
for i, b := range bts.Blocks {
fb := &types.FullBlock{
2019-07-05 14:29:17 +00:00
Header: b,
}
for _, mi := range bts.BlsMsgIncludes[i] {
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
2019-07-05 14:29:17 +00:00
}
for _, mi := range bts.SecpkMsgIncludes[i] {
fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi])
}
2019-07-05 14:29:17 +00:00
fts.Blocks = append(fts.Blocks, fb)
}
return fts, nil
}