lotus/chain/blocksync.go

512 lines
12 KiB
Go
Raw Normal View History

2019-07-05 14:29:17 +00:00
package chain
import (
"bufio"
"context"
"fmt"
"math/rand"
"sync"
bserv "github.com/ipfs/go-blockservice"
2019-07-08 13:36:43 +00:00
"github.com/libp2p/go-libp2p-core/host"
2019-07-05 14:46:21 +00:00
"github.com/libp2p/go-libp2p-core/protocol"
"golang.org/x/xerrors"
2019-07-26 04:54:22 +00:00
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
2019-07-05 14:29:17 +00:00
blocks "github.com/ipfs/go-block-format"
2019-07-05 14:29:17 +00:00
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
inet "github.com/libp2p/go-libp2p-core/network"
2019-07-08 12:51:45 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-07-05 14:29:17 +00:00
)
2019-07-05 14:46:21 +00:00
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
2019-07-05 14:29:17 +00:00
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
func init() {
cbor.RegisterCborType(BlockSyncRequest{})
cbor.RegisterCborType(BlockSyncResponse{})
cbor.RegisterCborType(BSTipSet{})
}
type BlockSyncService struct {
2019-07-26 04:54:22 +00:00
cs *store.ChainStore
2019-07-05 14:29:17 +00:00
}
type BlockSyncRequest struct {
Start []cid.Cid
RequestLength uint64
Options uint64
}
type BSOptions struct {
IncludeBlocks bool
IncludeMessages bool
}
func ParseBSOptions(optfield uint64) *BSOptions {
return &BSOptions{
IncludeBlocks: optfield&(BSOptBlocks) != 0,
IncludeMessages: optfield&(BSOptMessages) != 0,
}
}
const (
BSOptBlocks = 1 << 0
BSOptMessages = 1 << 1
)
type BlockSyncResponse struct {
Chain []*BSTipSet
Status uint
Message string
}
type BSTipSet struct {
Blocks []*types.BlockHeader
2019-07-05 14:29:17 +00:00
BlsMessages []*types.Message
BlsMsgIncludes [][]int
SecpkMessages []*types.SignedMessage
SecpkMsgIncludes [][]int
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
2019-07-05 14:29:17 +00:00
return &BlockSyncService{
cs: cs,
}
}
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
defer s.Close()
var req BlockSyncRequest
2019-07-08 12:46:30 +00:00
if err := cborrpc.ReadCborRPC(bufio.NewReader(s), &req); err != nil {
2019-07-05 14:29:17 +00:00
log.Errorf("failed to read block sync request: %s", err)
return
}
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
2019-07-05 14:29:17 +00:00
resp, err := bss.processRequest(&req)
if err != nil {
log.Error("failed to process block sync request: ", err)
return
}
2019-07-08 12:46:30 +00:00
if err := cborrpc.WriteCborRPC(s, resp); err != nil {
2019-07-05 14:29:17 +00:00
log.Error("failed to write back response for handle stream: ", err)
return
}
}
func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncResponse, error) {
opts := ParseBSOptions(req.Options)
if len(req.Start) == 0 {
return &BlockSyncResponse{
2019-08-02 22:32:02 +00:00
Status: 204,
Message: "no cids given in blocksync request",
}, nil
}
2019-07-05 14:29:17 +00:00
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts)
if err != nil {
log.Error("encountered error while responding to block sync request: ", err)
return &BlockSyncResponse{
Status: 203,
}, nil
}
return &BlockSyncResponse{
Chain: chain,
Status: 0,
}, nil
}
func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, opts *BSOptions) ([]*BSTipSet, error) {
var bstips []*BSTipSet
cur := start
for {
var bst BSTipSet
ts, err := bss.cs.LoadTipSet(cur)
if err != nil {
return nil, err
}
if opts.IncludeMessages {
bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, xerrors.Errorf("gather messages failed: %w", err)
2019-07-05 14:29:17 +00:00
}
bst.BlsMessages = bmsgs
bst.BlsMsgIncludes = bmincl
bst.SecpkMessages = smsgs
bst.SecpkMsgIncludes = smincl
2019-07-05 14:29:17 +00:00
}
if opts.IncludeBlocks {
bst.Blocks = ts.Blocks()
}
bstips = append(bstips, &bst)
if uint64(len(bstips)) >= length || ts.Height() == 0 {
return bstips, nil
}
cur = ts.Parents()
}
}
func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]int, []*types.SignedMessage, [][]int, error) {
blsmsgmap := make(map[cid.Cid]int)
secpkmsgmap := make(map[cid.Cid]int)
var secpkmsgs []*types.SignedMessage
var blsmsgs []*types.Message
var secpkincl, blsincl [][]int
2019-07-05 14:29:17 +00:00
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := bss.cs.MessagesForBlock(b)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, nil, nil, nil, err
2019-07-05 14:29:17 +00:00
}
bmi := make([]int, 0, len(bmsgs))
for _, m := range bmsgs {
i, ok := blsmsgmap[m.Cid()]
2019-07-05 14:29:17 +00:00
if !ok {
i = len(blsmsgs)
blsmsgs = append(blsmsgs, m)
blsmsgmap[m.Cid()] = i
2019-07-05 14:29:17 +00:00
}
bmi = append(bmi, i)
2019-07-05 14:29:17 +00:00
}
blsincl = append(blsincl, bmi)
smi := make([]int, 0, len(smsgs))
for _, m := range smsgs {
i, ok := secpkmsgmap[m.Cid()]
if !ok {
i = len(secpkmsgs)
secpkmsgs = append(secpkmsgs, m)
secpkmsgmap[m.Cid()] = i
}
smi = append(smi, i)
}
secpkincl = append(secpkincl, smi)
2019-07-05 14:29:17 +00:00
}
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
2019-07-05 14:29:17 +00:00
}
type BlockSync struct {
bserv bserv.BlockService
2019-07-05 14:29:17 +00:00
newStream NewStreamFunc
syncPeersLk sync.Mutex
syncPeers map[peer.ID]struct{}
}
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
2019-07-05 14:29:17 +00:00
return &BlockSync{
bserv: bserv,
2019-07-08 13:36:43 +00:00
newStream: h.NewStream,
2019-07-05 14:29:17 +00:00
syncPeers: make(map[peer.ID]struct{}),
}
}
func (bs *BlockSync) getPeers() []peer.ID {
bs.syncPeersLk.Lock()
defer bs.syncPeersLk.Unlock()
var out []peer.ID
for p := range bs.syncPeers {
out = append(out, p)
}
return out
}
2019-08-01 14:05:35 +00:00
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
switch res.Status {
case 101: // Partial Response
panic("not handled")
case 201: // req.Start not found
2019-08-01 14:05:35 +00:00
return fmt.Errorf("not found")
case 202: // Go Away
panic("not handled")
case 203: // Internal Error
2019-08-01 14:05:35 +00:00
return fmt.Errorf("block sync peer errored: %s", res.Message)
default:
2019-08-01 14:05:35 +00:00
return fmt.Errorf("unrecognized response code")
}
}
2019-07-26 04:54:22 +00:00
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
2019-07-05 14:29:17 +00:00
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error
req := &BlockSyncRequest{
Start: tipset,
RequestLength: uint64(count),
Options: BSOptBlocks,
}
var err error
for _, p := range perm {
2019-08-01 14:05:35 +00:00
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
if err != nil {
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
continue
}
2019-08-01 14:05:35 +00:00
if res.Status == 0 {
return bs.processBlocksResponse(req, res)
}
2019-08-01 14:05:35 +00:00
err = bs.processStatus(req, res)
2019-08-01 17:19:18 +00:00
if err != nil {
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), err)
}
2019-07-05 14:29:17 +00:00
}
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", err)
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) (*store.FullTipSet, error) {
2019-07-05 14:29:17 +00:00
// TODO: round robin through these peers on error
req := &BlockSyncRequest{
Start: h,
RequestLength: 1,
Options: BSOptBlocks | BSOptMessages,
}
res, err := bs.sendRequestToPeer(ctx, p, req)
if err != nil {
return nil, err
}
switch res.Status {
case 0: // Success
if len(res.Chain) == 0 {
return nil, fmt.Errorf("got zero length chain response")
}
bts := res.Chain[0]
return bstsToFullTipSet(bts)
case 101: // Partial Response
panic("not handled")
case 201: // req.Start not found
return nil, fmt.Errorf("not found")
case 202: // Go Away
panic("not handled")
case 203: // Internal Error
2019-08-02 22:32:02 +00:00
return nil, fmt.Errorf("block sync peer errored: %q", res.Message)
case 204: // Invalid Request
return nil, fmt.Errorf("block sync request invalid: %q", res.Message)
2019-07-05 14:29:17 +00:00
default:
return nil, fmt.Errorf("unrecognized response code")
}
}
2019-07-26 04:54:22 +00:00
func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) {
2019-07-05 14:29:17 +00:00
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error
req := &BlockSyncRequest{
Start: h.Cids(),
RequestLength: count,
Options: BSOptMessages | BSOptBlocks,
2019-07-05 14:29:17 +00:00
}
2019-08-01 14:05:35 +00:00
var err error
for _, p := range perm {
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
if err != nil {
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
continue
}
2019-07-05 14:29:17 +00:00
2019-08-01 14:05:35 +00:00
if res.Status == 0 {
return res.Chain, nil
}
err = bs.processStatus(req, res)
2019-08-01 17:19:18 +00:00
if err != nil {
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), err)
}
2019-07-05 14:29:17 +00:00
}
2019-08-01 14:05:35 +00:00
// TODO: What if we have no peers (and err is nil)?
return nil, xerrors.Errorf("GetChainMessages failed with all peers: %w", err)
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
fts := &store.FullTipSet{}
2019-07-05 14:29:17 +00:00
for i, b := range bts.Blocks {
fb := &types.FullBlock{
2019-07-05 14:29:17 +00:00
Header: b,
}
for _, mi := range bts.BlsMsgIncludes[i] {
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
2019-07-05 14:29:17 +00:00
}
fts.Blocks = append(fts.Blocks, fb)
}
return fts, nil
}
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
s, err := bs.newStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
if err != nil {
return nil, err
}
2019-07-08 12:46:30 +00:00
if err := cborrpc.WriteCborRPC(s, req); err != nil {
2019-07-05 14:29:17 +00:00
return nil, err
}
var res BlockSyncResponse
2019-07-08 12:46:30 +00:00
if err := cborrpc.ReadCborRPC(bufio.NewReader(s), &res); err != nil {
2019-07-05 14:29:17 +00:00
return nil, err
}
return &res, nil
}
2019-07-26 04:54:22 +00:00
func (bs *BlockSync) processBlocksResponse(req *BlockSyncRequest, res *BlockSyncResponse) ([]*types.TipSet, error) {
cur, err := types.NewTipSet(res.Chain[0].Blocks)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, err
}
2019-07-26 04:54:22 +00:00
out := []*types.TipSet{cur}
2019-07-05 14:29:17 +00:00
for bi := 1; bi < len(res.Chain); bi++ {
next := res.Chain[bi].Blocks
2019-07-26 04:54:22 +00:00
nts, err := types.NewTipSet(next)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, err
}
if !cidArrsEqual(cur.Parents(), nts.Cids()) {
return nil, fmt.Errorf("parents of tipset[%d] were not tipset[%d]", bi-1, bi)
}
out = append(out, nts)
cur = nts
}
return out, nil
}
func cidArrsEqual(a, b []cid.Cid) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if b[i] != v {
return false
}
}
return true
}
func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
sb, err := bs.bserv.GetBlock(ctx, c)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, err
}
return types.DecodeBlock(sb.RawData())
2019-07-05 14:29:17 +00:00
}
func (bs *BlockSync) AddPeer(p peer.ID) {
bs.syncPeersLk.Lock()
defer bs.syncPeersLk.Unlock()
bs.syncPeers[p] = struct{}{}
}
func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
out := make([]*types.Message, len(cids))
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
msg, err := types.DecodeMessage(b.RawData())
if err != nil {
return err
}
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = msg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
func (bs *BlockSync) FetchSignedMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids))
2019-07-05 14:29:17 +00:00
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
smsg, err := types.DecodeSignedMessage(b.RawData())
if err != nil {
return err
}
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = smsg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int, blocks.Block) error) error {
resp := bs.bserv.GetBlocks(context.TODO(), cids)
2019-07-05 14:29:17 +00:00
m := make(map[cid.Cid]int)
for i, c := range cids {
m[c] = i
}
for i := 0; i < len(cids); i++ {
select {
case v, ok := <-resp:
if !ok {
if i == len(cids)-1 {
break
}
return fmt.Errorf("failed to fetch all messages")
2019-07-05 14:29:17 +00:00
}
ix, ok := m[v.Cid()]
2019-07-05 14:29:17 +00:00
if !ok {
return fmt.Errorf("received message we didnt ask for")
2019-07-05 14:29:17 +00:00
}
if err := cb(ix, v); err != nil {
return err
2019-07-05 14:29:17 +00:00
}
}
}
return nil
2019-07-05 14:29:17 +00:00
}