lotus/chain/blocksync.go
2019-11-07 15:11:39 +01:00

531 lines
12 KiB
Go

package chain
import (
"bufio"
"context"
"fmt"
"math/rand"
"sync"
bserv "github.com/ipfs/go-blockservice"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/cborutil"
"github.com/filecoin-project/lotus/node/modules/dtypes"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
func init() {
cbor.RegisterCborType(BlockSyncRequest{})
cbor.RegisterCborType(BlockSyncResponse{})
cbor.RegisterCborType(BSTipSet{})
}
type BlockSyncService struct {
cs *store.ChainStore
}
type BlockSyncRequest struct {
Start []cid.Cid
RequestLength uint64
Options uint64
}
type BSOptions struct {
IncludeBlocks bool
IncludeMessages bool
}
func ParseBSOptions(optfield uint64) *BSOptions {
return &BSOptions{
IncludeBlocks: optfield&(BSOptBlocks) != 0,
IncludeMessages: optfield&(BSOptMessages) != 0,
}
}
const (
BSOptBlocks = 1 << 0
BSOptMessages = 1 << 1
)
type BlockSyncResponse struct {
Chain []*BSTipSet
Status uint64
Message string
}
type BSTipSet struct {
Blocks []*types.BlockHeader
BlsMessages []*types.Message
BlsMsgIncludes [][]uint64
SecpkMessages []*types.SignedMessage
SecpkMsgIncludes [][]uint64
}
func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
return &BlockSyncService{
cs: cs,
}
}
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream")
defer span.End()
defer s.Close()
var req BlockSyncRequest
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &req); err != nil {
log.Errorf("failed to read block sync request: %s", err)
return
}
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
resp, err := bss.processRequest(ctx, &req)
if err != nil {
log.Error("failed to process block sync request: ", err)
return
}
if err := cborutil.WriteCborRPC(s, resp); err != nil {
log.Error("failed to write back response for handle stream: ", err)
return
}
}
func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) {
ctx, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
defer span.End()
opts := ParseBSOptions(req.Options)
if len(req.Start) == 0 {
return &BlockSyncResponse{
Status: 204,
Message: "no cids given in blocksync request",
}, nil
}
span.AddAttributes(
trace.BoolAttribute("blocks", opts.IncludeBlocks),
trace.BoolAttribute("messages", opts.IncludeMessages),
)
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts)
if err != nil {
log.Error("encountered error while responding to block sync request: ", err)
return &BlockSyncResponse{
Status: 203,
}, nil
}
return &BlockSyncResponse{
Chain: chain,
Status: 0,
}, nil
}
func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, opts *BSOptions) ([]*BSTipSet, error) {
var bstips []*BSTipSet
cur := start
for {
var bst BSTipSet
ts, err := bss.cs.LoadTipSet(cur)
if err != nil {
return nil, err
}
if opts.IncludeMessages {
bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts)
if err != nil {
return nil, xerrors.Errorf("gather messages failed: %w", err)
}
bst.BlsMessages = bmsgs
bst.BlsMsgIncludes = bmincl
bst.SecpkMessages = smsgs
bst.SecpkMsgIncludes = smincl
}
if opts.IncludeBlocks {
bst.Blocks = ts.Blocks()
}
bstips = append(bstips, &bst)
if uint64(len(bstips)) >= length || ts.Height() == 0 {
return bstips, nil
}
cur = ts.Parents()
}
}
func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
blsmsgmap := make(map[cid.Cid]uint64)
secpkmsgmap := make(map[cid.Cid]uint64)
var secpkmsgs []*types.SignedMessage
var blsmsgs []*types.Message
var secpkincl, blsincl [][]uint64
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := bss.cs.MessagesForBlock(b)
if err != nil {
return nil, nil, nil, nil, err
}
bmi := make([]uint64, 0, len(bmsgs))
for _, m := range bmsgs {
i, ok := blsmsgmap[m.Cid()]
if !ok {
i = uint64(len(blsmsgs))
blsmsgs = append(blsmsgs, m)
blsmsgmap[m.Cid()] = i
}
bmi = append(bmi, i)
}
blsincl = append(blsincl, bmi)
smi := make([]uint64, 0, len(smsgs))
for _, m := range smsgs {
i, ok := secpkmsgmap[m.Cid()]
if !ok {
i = uint64(len(secpkmsgs))
secpkmsgs = append(secpkmsgs, m)
secpkmsgmap[m.Cid()] = i
}
smi = append(smi, i)
}
secpkincl = append(secpkincl, smi)
}
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
}
type BlockSync struct {
bserv bserv.BlockService
newStream NewStreamFunc
syncPeersLk sync.Mutex
syncPeers map[peer.ID]struct{}
}
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
return &BlockSync{
bserv: bserv,
newStream: h.NewStream,
syncPeers: make(map[peer.ID]struct{}),
}
}
func (bs *BlockSync) getPeers() []peer.ID {
bs.syncPeersLk.Lock()
defer bs.syncPeersLk.Unlock()
var out []peer.ID
for p := range bs.syncPeers {
out = append(out, p)
}
return out
}
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
switch res.Status {
case 101: // Partial Response
panic("not handled")
case 201: // req.Start not found
return fmt.Errorf("not found")
case 202: // Go Away
panic("not handled")
case 203: // Internal Error
return fmt.Errorf("block sync peer errored: %s", res.Message)
case 204:
return fmt.Errorf("block sync request invalid: %s", res.Message)
default:
return fmt.Errorf("unrecognized response code: %d", res.Status)
}
}
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("tipset", fmt.Sprint(tipset)),
trace.Int64Attribute("count", int64(count)),
)
}
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error
req := &BlockSyncRequest{
Start: tipset,
RequestLength: uint64(count),
Options: BSOptBlocks,
}
var oerr error
for _, p := range perm {
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
if err != nil {
oerr = err
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
continue
}
if res.Status == 0 {
return bs.processBlocksResponse(req, res)
}
oerr = bs.processStatus(req, res)
if oerr != nil {
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), oerr)
}
}
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr)
}
func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) (*store.FullTipSet, error) {
// TODO: round robin through these peers on error
req := &BlockSyncRequest{
Start: h,
RequestLength: 1,
Options: BSOptBlocks | BSOptMessages,
}
res, err := bs.sendRequestToPeer(ctx, p, req)
if err != nil {
return nil, err
}
switch res.Status {
case 0: // Success
if len(res.Chain) == 0 {
return nil, fmt.Errorf("got zero length chain response")
}
bts := res.Chain[0]
return bstsToFullTipSet(bts)
case 101: // Partial Response
panic("not handled")
case 201: // req.Start not found
return nil, fmt.Errorf("not found")
case 202: // Go Away
panic("not handled")
case 203: // Internal Error
return nil, fmt.Errorf("block sync peer errored: %q", res.Message)
case 204: // Invalid Request
return nil, fmt.Errorf("block sync request invalid: %q", res.Message)
default:
return nil, fmt.Errorf("unrecognized response code")
}
}
func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) {
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
defer span.End()
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error
req := &BlockSyncRequest{
Start: h.Cids(),
RequestLength: count,
Options: BSOptMessages | BSOptBlocks,
}
var err error
for _, p := range perm {
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
if err != nil {
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
continue
}
if res.Status == 0 {
return res.Chain, nil
}
err = bs.processStatus(req, res)
if err != nil {
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), err)
}
}
// TODO: What if we have no peers (and err is nil)?
return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err)
}
func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
fts := &store.FullTipSet{}
for i, b := range bts.Blocks {
fb := &types.FullBlock{
Header: b,
}
for _, mi := range bts.BlsMsgIncludes[i] {
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
}
for _, mi := range bts.SecpkMsgIncludes[i] {
fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi])
}
fts.Blocks = append(fts.Blocks, fb)
}
return fts, nil
}
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
s, err := bs.newStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
if err != nil {
return nil, err
}
if err := cborutil.WriteCborRPC(s, req); err != nil {
return nil, err
}
var res BlockSyncResponse
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil {
return nil, err
}
return &res, nil
}
func (bs *BlockSync) processBlocksResponse(req *BlockSyncRequest, res *BlockSyncResponse) ([]*types.TipSet, error) {
cur, err := types.NewTipSet(res.Chain[0].Blocks)
if err != nil {
return nil, err
}
out := []*types.TipSet{cur}
for bi := 1; bi < len(res.Chain); bi++ {
next := res.Chain[bi].Blocks
nts, err := types.NewTipSet(next)
if err != nil {
return nil, err
}
if !types.CidArrsEqual(cur.Parents(), nts.Cids()) {
return nil, fmt.Errorf("parents of tipset[%d] were not tipset[%d]", bi-1, bi)
}
out = append(out, nts)
cur = nts
}
return out, nil
}
func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
sb, err := bs.bserv.GetBlock(ctx, c)
if err != nil {
return nil, err
}
return types.DecodeBlock(sb.RawData())
}
func (bs *BlockSync) AddPeer(p peer.ID) {
bs.syncPeersLk.Lock()
defer bs.syncPeersLk.Unlock()
bs.syncPeers[p] = struct{}{}
}
func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
out := make([]*types.Message, len(cids))
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
msg, err := types.DecodeMessage(b.RawData())
if err != nil {
return err
}
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = msg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
func (bs *BlockSync) FetchSignedMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids))
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
smsg, err := types.DecodeSignedMessage(b.RawData())
if err != nil {
return err
}
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = smsg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int, blocks.Block) error) error {
resp := bs.bserv.GetBlocks(context.TODO(), cids)
m := make(map[cid.Cid]int)
for i, c := range cids {
m[c] = i
}
for i := 0; i < len(cids); i++ {
select {
case v, ok := <-resp:
if !ok {
if i == len(cids)-1 {
break
}
return fmt.Errorf("failed to fetch all messages")
}
ix, ok := m[v.Cid()]
if !ok {
return fmt.Errorf("received message we didnt ask for")
}
if err := cb(ix, v); err != nil {
return err
}
}
}
return nil
}