package exchange import ( "bufio" "context" "fmt" "math/rand" "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "go.opencensus.io/trace" "go.uber.org/fx" "golang.org/x/xerrors" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" incrt "github.com/filecoin-project/lotus/lib/increadtimeout" "github.com/filecoin-project/lotus/lib/peermgr" ) // client implements exchange.Client, using the libp2p ChainExchange protocol // as the fetching mechanism. type client struct { // Connection manager used to contact the server. // FIXME: We should have a reduced interface here, initialized // just with our protocol ID, we shouldn't be able to open *any* // connection. host host.Host peerTracker *bsPeerTracker } var _ Client = (*client)(nil) // NewClient creates a new libp2p-based exchange.Client that uses the libp2p // ChainExhange protocol as the fetching mechanism. func NewClient(lc fx.Lifecycle, host host.Host, pmgr peermgr.MaybePeerMgr) Client { return &client{ host: host, peerTracker: newPeerTracker(lc, host, pmgr.Mgr), } } // Main logic of the client request service. The provided `Request` // is sent to the `singlePeer` if one is indicated or to all available // ones otherwise. The response is processed and validated according // to the `Request` options. Either a `validatedResponse` is returned // (which can be safely accessed), or an `error` that may represent // either a response error status, a failed validation or an internal // error. // // This is the internal single point of entry for all external-facing // APIs, currently we have 3 very heterogeneous services exposed: // * GetBlocks: Headers // * GetFullTipSet: Headers | Messages // * GetChainMessages: Messages // This function handles all the different combinations of the available // request options without disrupting external calls. In the future the // consumers should be forced to use a more standardized service and // adhere to a single API derived from this function. func (c *client) doRequest( ctx context.Context, req *Request, singlePeer *peer.ID, // In the `GetChainMessages` case, we won't request the headers but we still // need them to check the integrity of the `CompactedMessages` in the response // so the tipset blocks need to be provided by the caller. tipsets []*types.TipSet, ) (*validatedResponse, error) { // Validate request. if req.Length == 0 { return nil, xerrors.Errorf("invalid request of length 0") } if req.Length > MaxRequestLength { return nil, xerrors.Errorf("request length (%d) above maximum (%d)", req.Length, MaxRequestLength) } if req.Options == 0 { return nil, xerrors.Errorf("request with no options set") } // Generate the list of peers to be queried, either the // `singlePeer` indicated or all peers available (sorted // by an internal peer tracker with some randomness injected). var peers []peer.ID if singlePeer != nil { peers = []peer.ID{*singlePeer} } else { peers = c.getShuffledPeers() if len(peers) == 0 { return nil, xerrors.Errorf("no peers available") } } // Try the request for each peer in the list, // return on the first successful response. // FIXME: Doing this serially isn't great, but fetching in parallel // may not be a good idea either. Think about this more. globalTime := build.Clock.Now() // Global time used to track what is the expected time we will need to get // a response if a client fails us. for _, peer := range peers { select { case <-ctx.Done(): return nil, xerrors.Errorf("context cancelled: %w", ctx.Err()) default: } // Send request, read response. res, err := c.sendRequestToPeer(ctx, peer, req) if err != nil { if !xerrors.Is(err, network.ErrNoConn) { log.Warnf("could not send request to peer %s: %s", peer.String(), err) } continue } // Process and validate response. validRes, err := c.processResponse(req, res, tipsets) if err != nil { log.Warnf("processing peer %s response failed: %s", peer.String(), err) continue } c.peerTracker.logGlobalSuccess(build.Clock.Since(globalTime)) c.host.ConnManager().TagPeer(peer, "bsync", SuccessPeerTagValue) return validRes, nil } errString := "doRequest failed for all peers" if singlePeer != nil { errString = fmt.Sprintf("doRequest failed for single peer %s", *singlePeer) } return nil, xerrors.Errorf(errString) } // Process and validate response. Check the status, the integrity of the // information returned, and that it matches the request. Extract the information // into a `validatedResponse` for the external-facing APIs to select what they // need. // // We are conflating in the single error returned both status and validation // errors. Peer penalization should happen here then, before returning, so // we can apply the correct penalties depending on the cause of the error. // FIXME: Add the `peer` as argument once we implement penalties. func (c *client) processResponse(req *Request, res *Response, tipsets []*types.TipSet) (r *validatedResponse, err error) { err = res.statusToError() if err != nil { return nil, xerrors.Errorf("status error: %s", err) } defer func() { if rerr := recover(); rerr != nil { log.Errorf("process response error: %s", rerr) err = xerrors.Errorf("process response error: %s", rerr) return } }() options := parseOptions(req.Options) if options.noOptionsSet() { // Safety check: this shouldn't have been sent, and even if it did // it should have been caught by the peer in its error status. return nil, xerrors.Errorf("nothing was requested") } // Verify that the chain segment returned is in the valid range. // Note that the returned length might be less than requested. resLength := len(res.Chain) if resLength == 0 { return nil, xerrors.Errorf("got no chain in successful response") } if resLength > int(req.Length) { return nil, xerrors.Errorf("got longer response (%d) than requested (%d)", resLength, req.Length) } if resLength < int(req.Length) && res.Status != Partial { return nil, xerrors.Errorf("got less than requested without a proper status: %d", res.Status) } validRes := &validatedResponse{} if options.IncludeHeaders { // Check for valid block sets and extract them into `TipSet`s. validRes.tipsets = make([]*types.TipSet, resLength) for i := 0; i < resLength; i++ { if res.Chain[i] == nil { return nil, xerrors.Errorf("response with nil tipset in pos %d", i) } for blockIdx, block := range res.Chain[i].Blocks { if block == nil { return nil, xerrors.Errorf("tipset with nil block in pos %d", blockIdx) // FIXME: Maybe we should move this check to `NewTipSet`. } } validRes.tipsets[i], err = types.NewTipSet(res.Chain[i].Blocks) if err != nil { return nil, xerrors.Errorf("invalid tipset blocks at height (head - %d): %w", i, err) } } // Check that the returned head matches the one requested. if !types.CidArrsEqual(validRes.tipsets[0].Cids(), req.Head) { return nil, xerrors.Errorf("returned chain head does not match request") } // Check `TipSet`s are connected (valid chain). for i := 0; i < len(validRes.tipsets)-1; i++ { if validRes.tipsets[i].IsChildOf(validRes.tipsets[i+1]) == false { return nil, fmt.Errorf("tipsets are not connected at height (head - %d)/(head - %d)", i, i+1) // FIXME: Maybe give more information here, like CIDs. } } } if options.IncludeMessages { validRes.messages = make([]*CompactedMessages, resLength) for i := 0; i < resLength; i++ { if res.Chain[i].Messages == nil { return nil, xerrors.Errorf("no messages included for tipset at height (head - %d)", i) } validRes.messages[i] = res.Chain[i].Messages } if options.IncludeHeaders { // If the headers were also returned check that the compression // indexes are valid before `toFullTipSets()` is called by the // consumer. err := c.validateCompressedIndices(res.Chain) if err != nil { return nil, err } } else { // If we didn't request the headers they should have been provided // by the caller. if len(tipsets) < len(res.Chain) { return nil, xerrors.Errorf("not enought tipsets provided for message response validation, needed %d, have %d", len(res.Chain), len(tipsets)) } chain := make([]*BSTipSet, 0, resLength) for i, resChain := range res.Chain { next := &BSTipSet{ Blocks: tipsets[i].Blocks(), Messages: resChain.Messages, } chain = append(chain, next) } err := c.validateCompressedIndices(chain) if err != nil { return nil, err } } } return validRes, nil } func (c *client) validateCompressedIndices(chain []*BSTipSet) error { resLength := len(chain) for tipsetIdx := 0; tipsetIdx < resLength; tipsetIdx++ { msgs := chain[tipsetIdx].Messages blocksNum := len(chain[tipsetIdx].Blocks) if len(msgs.BlsIncludes) != blocksNum { return xerrors.Errorf("BlsIncludes (%d) does not match number of blocks (%d)", len(msgs.BlsIncludes), blocksNum) } if len(msgs.SecpkIncludes) != blocksNum { return xerrors.Errorf("SecpkIncludes (%d) does not match number of blocks (%d)", len(msgs.SecpkIncludes), blocksNum) } for blockIdx := 0; blockIdx < blocksNum; blockIdx++ { for _, mi := range msgs.BlsIncludes[blockIdx] { if int(mi) >= len(msgs.Bls) { return xerrors.Errorf("index in BlsIncludes (%d) exceeds number of messages (%d)", mi, len(msgs.Bls)) } } for _, mi := range msgs.SecpkIncludes[blockIdx] { if int(mi) >= len(msgs.Secpk) { return xerrors.Errorf("index in SecpkIncludes (%d) exceeds number of messages (%d)", mi, len(msgs.Secpk)) } } } } return nil } // GetBlocks implements Client.GetBlocks(). Refer to the godocs there. func (c *client) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks") defer span.End() if span.IsRecordingEvents() { span.AddAttributes( trace.StringAttribute("tipset", fmt.Sprint(tsk.Cids())), trace.Int64Attribute("count", int64(count)), ) } req := &Request{ Head: tsk.Cids(), Length: uint64(count), Options: Headers, } validRes, err := c.doRequest(ctx, req, nil, nil) if err != nil { return nil, err } return validRes.tipsets, nil } // GetFullTipSet implements Client.GetFullTipSet(). Refer to the godocs there. func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) { // TODO: round robin through these peers on error req := &Request{ Head: tsk.Cids(), Length: 1, Options: Headers | Messages, } validRes, err := c.doRequest(ctx, req, &peer, nil) if err != nil { return nil, err } return validRes.toFullTipSets()[0], nil // If `doRequest` didn't fail we are guaranteed to have at least // *one* tipset here, so it's safe to index directly. } // GetChainMessages implements Client.GetChainMessages(). Refer to the godocs there. func (c *client) GetChainMessages(ctx context.Context, tipsets []*types.TipSet) ([]*CompactedMessages, error) { head := tipsets[0] length := uint64(len(tipsets)) ctx, span := trace.StartSpan(ctx, "GetChainMessages") if span.IsRecordingEvents() { span.AddAttributes( trace.StringAttribute("tipset", fmt.Sprint(head.Cids())), trace.Int64Attribute("count", int64(length)), ) } defer span.End() req := &Request{ Head: head.Cids(), Length: length, Options: Messages, } validRes, err := c.doRequest(ctx, req, nil, tipsets) if err != nil { return nil, err } return validRes.messages, nil } // Send a request to a peer. Write request in the stream and read the // response back. We do not do any processing of the request/response // here. func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Request) (_ *Response, err error) { // Trace code. ctx, span := trace.StartSpan(ctx, "sendRequestToPeer") defer span.End() if span.IsRecordingEvents() { span.AddAttributes( trace.StringAttribute("peer", peer.String()), ) } defer func() { if err != nil { if span.IsRecordingEvents() { span.SetStatus(trace.Status{ Code: 5, Message: err.Error(), }) } } }() // -- TRACE -- supported, err := c.host.Peerstore().SupportsProtocols(peer, ChainExchangeProtocolID) if err != nil { c.RemovePeer(peer) return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) } if len(supported) == 0 || (supported[0] != ChainExchangeProtocolID) { return nil, xerrors.Errorf("peer %s does not support protocols %s", peer, []string{ChainExchangeProtocolID}) } connectionStart := build.Clock.Now() // Open stream to peer. stream, err := c.host.NewStream( network.WithNoDial(ctx, "should already have connection"), peer, ChainExchangeProtocolID) if err != nil { c.RemovePeer(peer) return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } defer stream.Close() //nolint:errcheck // Write request. _ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline)) if err := cborutil.WriteCborRPC(stream, req); err != nil { _ = stream.SetWriteDeadline(time.Time{}) c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length) // FIXME: Should we also remove peer here? return nil, err } _ = stream.SetWriteDeadline(time.Time{}) // clear deadline // FIXME: Needs // its own API (https://github.com/libp2p/go-libp2p/core/issues/162). if err := stream.CloseWrite(); err != nil { log.Warnw("CloseWrite err", "error", err) } // Read response. var res Response err = cborutil.ReadCborRPC( bufio.NewReader(incrt.New(stream, ReadResMinSpeed, ReadResDeadline)), &res) if err != nil { c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length) return nil, xerrors.Errorf("failed to read chainxchg response: %w", err) } // FIXME: Move all this together at the top using a defer as done elsewhere. // Maybe we need to declare `res` in the signature. if span.IsRecordingEvents() { span.AddAttributes( trace.Int64Attribute("resp_status", int64(res.Status)), trace.StringAttribute("msg", res.ErrorMessage), trace.Int64Attribute("chain_len", int64(len(res.Chain))), ) } c.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart), uint64(len(res.Chain))) // FIXME: We should really log a success only after we validate the response. // It might be a bit hard to do. return &res, nil } // AddPeer implements Client.AddPeer(). Refer to the godocs there. func (c *client) AddPeer(p peer.ID) { c.peerTracker.addPeer(p) } // RemovePeer implements Client.RemovePeer(). Refer to the godocs there. func (c *client) RemovePeer(p peer.ID) { c.peerTracker.removePeer(p) } // getShuffledPeers returns a preference-sorted set of peers (by latency // and failure counting), shuffling the first few peers so we don't always // pick the same peer. // FIXME: Consider merging with `shufflePrefix()s`. func (c *client) getShuffledPeers() []peer.ID { peers := c.peerTracker.prefSortedPeers() shufflePrefix(peers) return peers } func shufflePrefix(peers []peer.ID) { prefix := ShufflePeersPrefix if len(peers) < prefix { prefix = len(peers) } buf := make([]peer.ID, prefix) perm := rand.Perm(prefix) for i, v := range perm { buf[i] = peers[v] } copy(peers, buf) }