split files, implement peer tracker
This commit is contained in:
parent
904fdad4c6
commit
c74f87fd51
@ -3,12 +3,7 @@ package blocksync
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
@ -16,9 +11,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
logging "github.com/ipfs/go-log"
|
||||
@ -224,151 +217,6 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message,
|
||||
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
|
||||
}
|
||||
|
||||
type BlockSync struct {
|
||||
bserv bserv.BlockService
|
||||
host host.Host
|
||||
|
||||
syncPeersLk sync.Mutex
|
||||
syncPeers map[peer.ID]struct{}
|
||||
}
|
||||
|
||||
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
|
||||
return &BlockSync{
|
||||
bserv: bserv,
|
||||
host: h,
|
||||
syncPeers: make(map[peer.ID]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
|
||||
switch res.Status {
|
||||
case 101: // Partial Response
|
||||
panic("not handled")
|
||||
case 201: // req.Start not found
|
||||
return fmt.Errorf("not found")
|
||||
case 202: // Go Away
|
||||
panic("not handled")
|
||||
case 203: // Internal Error
|
||||
return fmt.Errorf("block sync peer errored: %s", res.Message)
|
||||
case 204:
|
||||
return fmt.Errorf("block sync request invalid: %s", res.Message)
|
||||
default:
|
||||
return fmt.Errorf("unrecognized response code: %d", res.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
|
||||
defer span.End()
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("tipset", fmt.Sprint(tipset)),
|
||||
trace.Int64Attribute("count", int64(count)),
|
||||
)
|
||||
}
|
||||
|
||||
peers := bs.getPeers()
|
||||
perm := rand.Perm(len(peers))
|
||||
// TODO: round robin through these peers on error
|
||||
|
||||
req := &BlockSyncRequest{
|
||||
Start: tipset,
|
||||
RequestLength: uint64(count),
|
||||
Options: BSOptBlocks,
|
||||
}
|
||||
|
||||
var oerr error
|
||||
for _, p := range perm {
|
||||
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
|
||||
if err != nil {
|
||||
oerr = err
|
||||
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
if res.Status == 0 {
|
||||
return bs.processBlocksResponse(req, res)
|
||||
}
|
||||
oerr = bs.processStatus(req, res)
|
||||
if oerr != nil {
|
||||
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), oerr)
|
||||
}
|
||||
}
|
||||
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) (*store.FullTipSet, error) {
|
||||
// TODO: round robin through these peers on error
|
||||
|
||||
req := &BlockSyncRequest{
|
||||
Start: h,
|
||||
RequestLength: 1,
|
||||
Options: BSOptBlocks | BSOptMessages,
|
||||
}
|
||||
|
||||
res, err := bs.sendRequestToPeer(ctx, p, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch res.Status {
|
||||
case 0: // Success
|
||||
if len(res.Chain) == 0 {
|
||||
return nil, fmt.Errorf("got zero length chain response")
|
||||
}
|
||||
bts := res.Chain[0]
|
||||
|
||||
return bstsToFullTipSet(bts)
|
||||
case 101: // Partial Response
|
||||
panic("not handled")
|
||||
case 201: // req.Start not found
|
||||
return nil, fmt.Errorf("not found")
|
||||
case 202: // Go Away
|
||||
panic("not handled")
|
||||
case 203: // Internal Error
|
||||
return nil, fmt.Errorf("block sync peer errored: %q", res.Message)
|
||||
case 204: // Invalid Request
|
||||
return nil, fmt.Errorf("block sync request invalid: %q", res.Message)
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized response code")
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
|
||||
defer span.End()
|
||||
|
||||
peers := bs.getPeers()
|
||||
perm := rand.Perm(len(peers))
|
||||
// TODO: round robin through these peers on error
|
||||
|
||||
req := &BlockSyncRequest{
|
||||
Start: h.Cids(),
|
||||
RequestLength: count,
|
||||
Options: BSOptMessages | BSOptBlocks,
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, p := range perm {
|
||||
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
|
||||
if err != nil {
|
||||
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
if res.Status == 0 {
|
||||
return res.Chain, nil
|
||||
}
|
||||
err = bs.processStatus(req, res)
|
||||
if err != nil {
|
||||
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: What if we have no peers (and err is nil)?
|
||||
return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err)
|
||||
}
|
||||
|
||||
func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
|
||||
fts := &store.FullTipSet{}
|
||||
for i, b := range bts.Blocks {
|
||||
@ -387,167 +235,3 @@ func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
|
||||
|
||||
return fts, nil
|
||||
}
|
||||
|
||||
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
|
||||
defer span.End()
|
||||
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("peer", p.Pretty()),
|
||||
)
|
||||
}
|
||||
|
||||
s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
|
||||
if err != nil {
|
||||
bs.RemovePeer(p)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res BlockSyncResponse
|
||||
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (bs *BlockSync) processBlocksResponse(req *BlockSyncRequest, res *BlockSyncResponse) ([]*types.TipSet, error) {
|
||||
cur, err := types.NewTipSet(res.Chain[0].Blocks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := []*types.TipSet{cur}
|
||||
for bi := 1; bi < len(res.Chain); bi++ {
|
||||
next := res.Chain[bi].Blocks
|
||||
nts, err := types.NewTipSet(next)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !types.CidArrsEqual(cur.Parents(), nts.Cids()) {
|
||||
return nil, fmt.Errorf("parents of tipset[%d] were not tipset[%d]", bi-1, bi)
|
||||
}
|
||||
|
||||
out = append(out, nts)
|
||||
cur = nts
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
|
||||
sb, err := bs.bserv.GetBlock(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return types.DecodeBlock(sb.RawData())
|
||||
}
|
||||
|
||||
func (bs *BlockSync) AddPeer(p peer.ID) {
|
||||
bs.syncPeersLk.Lock()
|
||||
defer bs.syncPeersLk.Unlock()
|
||||
bs.syncPeers[p] = struct{}{}
|
||||
}
|
||||
|
||||
func (bs *BlockSync) RemovePeer(p peer.ID) {
|
||||
bs.syncPeersLk.Lock()
|
||||
defer bs.syncPeersLk.Unlock()
|
||||
delete(bs.syncPeers, p)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) getPeers() []peer.ID {
|
||||
bs.syncPeersLk.Lock()
|
||||
defer bs.syncPeersLk.Unlock()
|
||||
var out []peer.ID
|
||||
for p := range bs.syncPeers {
|
||||
out = append(out, p)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (bs *BlockSync) logPeerQuality(p peer.ID) {
|
||||
|
||||
}
|
||||
|
||||
func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
|
||||
out := make([]*types.Message, len(cids))
|
||||
|
||||
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
|
||||
msg, err := types.DecodeMessage(b.RawData())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out[i] != nil {
|
||||
return fmt.Errorf("received duplicate message")
|
||||
}
|
||||
|
||||
out[i] = msg
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (bs *BlockSync) FetchSignedMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) {
|
||||
out := make([]*types.SignedMessage, len(cids))
|
||||
|
||||
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
|
||||
smsg, err := types.DecodeSignedMessage(b.RawData())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out[i] != nil {
|
||||
return fmt.Errorf("received duplicate message")
|
||||
}
|
||||
|
||||
out[i] = smsg
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int, blocks.Block) error) error {
|
||||
resp := bs.bserv.GetBlocks(context.TODO(), cids)
|
||||
|
||||
m := make(map[cid.Cid]int)
|
||||
for i, c := range cids {
|
||||
m[c] = i
|
||||
}
|
||||
|
||||
for i := 0; i < len(cids); i++ {
|
||||
select {
|
||||
case v, ok := <-resp:
|
||||
if !ok {
|
||||
if i == len(cids)-1 {
|
||||
break
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to fetch all messages")
|
||||
}
|
||||
|
||||
ix, ok := m[v.Cid()]
|
||||
if !ok {
|
||||
return fmt.Errorf("received message we didnt ask for")
|
||||
}
|
||||
|
||||
if err := cb(ix, v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
400
chain/blocksync/blocksync_client.go
Normal file
400
chain/blocksync/blocksync_client.go
Normal file
@ -0,0 +1,400 @@
|
||||
package blocksync
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-cid"
|
||||
inet "github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
)
|
||||
|
||||
type BlockSync struct {
|
||||
bserv bserv.BlockService
|
||||
host host.Host
|
||||
|
||||
syncPeersLk sync.Mutex
|
||||
syncPeers *bsPeerTracker
|
||||
}
|
||||
|
||||
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
|
||||
return &BlockSync{
|
||||
bserv: bserv,
|
||||
host: h,
|
||||
syncPeers: newPeerTracker(),
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
|
||||
switch res.Status {
|
||||
case 101: // Partial Response
|
||||
panic("not handled")
|
||||
case 201: // req.Start not found
|
||||
return fmt.Errorf("not found")
|
||||
case 202: // Go Away
|
||||
panic("not handled")
|
||||
case 203: // Internal Error
|
||||
return fmt.Errorf("block sync peer errored: %s", res.Message)
|
||||
case 204:
|
||||
return fmt.Errorf("block sync request invalid: %s", res.Message)
|
||||
default:
|
||||
return fmt.Errorf("unrecognized response code: %d", res.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
|
||||
defer span.End()
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("tipset", fmt.Sprint(tipset)),
|
||||
trace.Int64Attribute("count", int64(count)),
|
||||
)
|
||||
}
|
||||
|
||||
peers := bs.getPeers()
|
||||
perm := rand.Perm(len(peers))
|
||||
// TODO: round robin through these peers on error
|
||||
|
||||
req := &BlockSyncRequest{
|
||||
Start: tipset,
|
||||
RequestLength: uint64(count),
|
||||
Options: BSOptBlocks,
|
||||
}
|
||||
|
||||
var oerr error
|
||||
for _, p := range perm {
|
||||
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
|
||||
if err != nil {
|
||||
oerr = err
|
||||
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
if res.Status == 0 {
|
||||
return bs.processBlocksResponse(req, res)
|
||||
}
|
||||
oerr = bs.processStatus(req, res)
|
||||
if oerr != nil {
|
||||
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), oerr)
|
||||
}
|
||||
}
|
||||
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) (*store.FullTipSet, error) {
|
||||
// TODO: round robin through these peers on error
|
||||
|
||||
req := &BlockSyncRequest{
|
||||
Start: h,
|
||||
RequestLength: 1,
|
||||
Options: BSOptBlocks | BSOptMessages,
|
||||
}
|
||||
|
||||
res, err := bs.sendRequestToPeer(ctx, p, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch res.Status {
|
||||
case 0: // Success
|
||||
if len(res.Chain) == 0 {
|
||||
return nil, fmt.Errorf("got zero length chain response")
|
||||
}
|
||||
bts := res.Chain[0]
|
||||
|
||||
return bstsToFullTipSet(bts)
|
||||
case 101: // Partial Response
|
||||
return nil, xerrors.Errorf("partial responses are not handled")
|
||||
case 201: // req.Start not found
|
||||
return nil, fmt.Errorf("not found")
|
||||
case 202: // Go Away
|
||||
return nil, xerrors.Errorf("received 'go away' response peer")
|
||||
case 203: // Internal Error
|
||||
return nil, fmt.Errorf("block sync peer errored: %q", res.Message)
|
||||
case 204: // Invalid Request
|
||||
return nil, fmt.Errorf("block sync request invalid: %q", res.Message)
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized response code")
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
|
||||
defer span.End()
|
||||
|
||||
peers := bs.getPeers()
|
||||
perm := rand.Perm(len(peers))
|
||||
// TODO: round robin through these peers on error
|
||||
|
||||
req := &BlockSyncRequest{
|
||||
Start: h.Cids(),
|
||||
RequestLength: count,
|
||||
Options: BSOptMessages | BSOptBlocks,
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, p := range perm {
|
||||
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
|
||||
if err != nil {
|
||||
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
if res.Status == 0 {
|
||||
return res.Chain, nil
|
||||
}
|
||||
err = bs.processStatus(req, res)
|
||||
if err != nil {
|
||||
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: What if we have no peers (and err is nil)?
|
||||
return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
|
||||
defer span.End()
|
||||
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("peer", p.Pretty()),
|
||||
)
|
||||
}
|
||||
|
||||
s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
|
||||
if err != nil {
|
||||
bs.RemovePeer(p)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res BlockSyncResponse
|
||||
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (bs *BlockSync) processBlocksResponse(req *BlockSyncRequest, res *BlockSyncResponse) ([]*types.TipSet, error) {
|
||||
cur, err := types.NewTipSet(res.Chain[0].Blocks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := []*types.TipSet{cur}
|
||||
for bi := 1; bi < len(res.Chain); bi++ {
|
||||
next := res.Chain[bi].Blocks
|
||||
nts, err := types.NewTipSet(next)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !types.CidArrsEqual(cur.Parents(), nts.Cids()) {
|
||||
return nil, fmt.Errorf("parents of tipset[%d] were not tipset[%d]", bi-1, bi)
|
||||
}
|
||||
|
||||
out = append(out, nts)
|
||||
cur = nts
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
|
||||
sb, err := bs.bserv.GetBlock(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return types.DecodeBlock(sb.RawData())
|
||||
}
|
||||
|
||||
func (bs *BlockSync) AddPeer(p peer.ID) {
|
||||
bs.syncPeers.addPeer(p)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) RemovePeer(p peer.ID) {
|
||||
bs.syncPeers.removePeer(p)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) getPeers() []peer.ID {
|
||||
return bs.syncPeers.prefSortedPeers()
|
||||
}
|
||||
|
||||
func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
|
||||
out := make([]*types.Message, len(cids))
|
||||
|
||||
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
|
||||
msg, err := types.DecodeMessage(b.RawData())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out[i] != nil {
|
||||
return fmt.Errorf("received duplicate message")
|
||||
}
|
||||
|
||||
out[i] = msg
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (bs *BlockSync) FetchSignedMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) {
|
||||
out := make([]*types.SignedMessage, len(cids))
|
||||
|
||||
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
|
||||
smsg, err := types.DecodeSignedMessage(b.RawData())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out[i] != nil {
|
||||
return fmt.Errorf("received duplicate message")
|
||||
}
|
||||
|
||||
out[i] = smsg
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int, blocks.Block) error) error {
|
||||
resp := bs.bserv.GetBlocks(context.TODO(), cids)
|
||||
|
||||
m := make(map[cid.Cid]int)
|
||||
for i, c := range cids {
|
||||
m[c] = i
|
||||
}
|
||||
|
||||
for i := 0; i < len(cids); i++ {
|
||||
select {
|
||||
case v, ok := <-resp:
|
||||
if !ok {
|
||||
if i == len(cids)-1 {
|
||||
break
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to fetch all messages")
|
||||
}
|
||||
|
||||
ix, ok := m[v.Cid()]
|
||||
if !ok {
|
||||
return fmt.Errorf("received message we didnt ask for")
|
||||
}
|
||||
|
||||
if err := cb(ix, v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type peerStats struct {
|
||||
successes int
|
||||
failures int
|
||||
firstSeen time.Time
|
||||
}
|
||||
|
||||
type bsPeerTracker struct {
|
||||
peers map[peer.ID]*peerStats
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
func newPeerTracker() *bsPeerTracker {
|
||||
return &bsPeerTracker{
|
||||
peers: make(map[peer.ID]*peerStats),
|
||||
}
|
||||
}
|
||||
func (bpt *bsPeerTracker) addPeer(p peer.ID) {
|
||||
bpt.lk.Lock()
|
||||
defer bpt.lk.Unlock()
|
||||
if _, ok := bpt.peers[p]; ok {
|
||||
return
|
||||
}
|
||||
bpt.peers[p] = &peerStats{
|
||||
firstSeen: time.Now(),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
|
||||
// TODO: this could probably be cached, but as long as its not too many peers, fine for now
|
||||
bpt.lk.Lock()
|
||||
defer bpt.lk.Unlock()
|
||||
out := make([]peer.ID, 0, len(bpt.peers))
|
||||
for p := range bpt.peers {
|
||||
out = append(out, p)
|
||||
}
|
||||
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
pi := bpt.peers[out[i]]
|
||||
pj := bpt.peers[out[j]]
|
||||
if pi.successes > pj.successes {
|
||||
return true
|
||||
}
|
||||
if pi.failures < pj.successes {
|
||||
return true
|
||||
}
|
||||
return pi.firstSeen.Before(pj.firstSeen)
|
||||
})
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (bpt *bsPeerTracker) logSuccess(p peer.ID) {
|
||||
bpt.lk.Lock()
|
||||
defer bpt.lk.Unlock()
|
||||
if pi, ok := bpt.peers[p]; !ok {
|
||||
log.Warn("log success called on peer not in tracker")
|
||||
return
|
||||
} else {
|
||||
pi.successes++
|
||||
}
|
||||
}
|
||||
|
||||
func (bpt *bsPeerTracker) logFailure(p peer.ID) {
|
||||
bpt.lk.Lock()
|
||||
defer bpt.lk.Unlock()
|
||||
if pi, ok := bpt.peers[p]; !ok {
|
||||
log.Warn("log failure called on peer not in tracker")
|
||||
return
|
||||
} else {
|
||||
pi.failures++
|
||||
}
|
||||
}
|
||||
|
||||
func (bpt *bsPeerTracker) removePeer(p peer.ID) {
|
||||
bpt.lk.Lock()
|
||||
defer bpt.lk.Unlock()
|
||||
delete(bpt.peers, p)
|
||||
}
|
1
go.mod
1
go.mod
@ -45,6 +45,7 @@ require (
|
||||
github.com/libp2p/go-libp2p-connmgr v0.1.0
|
||||
github.com/libp2p/go-libp2p-core v0.2.2
|
||||
github.com/libp2p/go-libp2p-discovery v0.1.0
|
||||
github.com/libp2p/go-libp2p-host v0.1.0
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.1.1
|
||||
github.com/libp2p/go-libp2p-mplex v0.2.1
|
||||
github.com/libp2p/go-libp2p-peer v0.2.0
|
||||
|
2
go.sum
2
go.sum
@ -305,6 +305,8 @@ github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoA
|
||||
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
|
||||
github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs=
|
||||
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
|
||||
github.com/libp2p/go-libp2p-host v0.1.0 h1:OZwENiFm6JOK3YR5PZJxkXlJE8a5u8g4YvAUrEV2MjM=
|
||||
github.com/libp2p/go-libp2p-host v0.1.0/go.mod h1:5+fWuLbDn8OxoxPN3CV0vsLe1hAKScSMbT84qRfxum8=
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.1.1 h1:IH6NQuoUv5w5e1O8Jc3KyVDtr0rNd0G9aaADpLI1xVo=
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.1.1/go.mod h1:1kj2Rk5pX3/0RwqMm9AMNCT7DzcMHYhgDN5VTi+cY0M=
|
||||
github.com/libp2p/go-libp2p-kbucket v0.2.0 h1:FB2a0VkOTNGTP5gu/I444u4WabNM9V1zCkQcWb7zajI=
|
||||
|
Loading…
Reference in New Issue
Block a user