From 55b1456d4568da630e25f1479000c43694b86493 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 7 Sep 2020 19:31:43 +0100 Subject: [PATCH 1/3] blocksync: introduce interfaces; rename to chainexchange. --- chain/{blocksync => exchange}/cbor_gen.go | 2 +- chain/{blocksync => exchange}/client.go | 124 +++++++----------- chain/exchange/doc.go | 19 +++ chain/exchange/interfaces.go | 51 +++++++ chain/{blocksync => exchange}/peer_tracker.go | 2 +- chain/{blocksync => exchange}/protocol.go | 20 +-- chain/{blocksync => exchange}/server.go | 65 +++------ chain/sync.go | 36 ++--- gen/main.go | 10 +- node/builder.go | 6 +- node/modules/chain.go | 6 +- node/modules/services.go | 6 +- 12 files changed, 186 insertions(+), 161 deletions(-) rename chain/{blocksync => exchange}/cbor_gen.go (99%) rename chain/{blocksync => exchange}/client.go (80%) create mode 100644 chain/exchange/doc.go create mode 100644 chain/exchange/interfaces.go rename chain/{blocksync => exchange}/peer_tracker.go (99%) rename chain/{blocksync => exchange}/protocol.go (94%) rename chain/{blocksync => exchange}/server.go (75%) diff --git a/chain/blocksync/cbor_gen.go b/chain/exchange/cbor_gen.go similarity index 99% rename from chain/blocksync/cbor_gen.go rename to chain/exchange/cbor_gen.go index cd43f4a64..dc91babe3 100644 --- a/chain/blocksync/cbor_gen.go +++ b/chain/exchange/cbor_gen.go @@ -1,6 +1,6 @@ // Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. -package blocksync +package exchange import ( "fmt" diff --git a/chain/blocksync/client.go b/chain/exchange/client.go similarity index 80% rename from chain/blocksync/client.go rename to chain/exchange/client.go index 38e1f6d2c..2133b7805 100644 --- a/chain/blocksync/client.go +++ b/chain/exchange/client.go @@ -1,4 +1,4 @@ -package blocksync +package exchange import ( "bufio" @@ -7,13 +7,15 @@ import ( "math/rand" "time" - host "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "go.opencensus.io/trace" "golang.org/x/xerrors" cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -21,11 +23,9 @@ import ( "github.com/filecoin-project/lotus/lib/peermgr" ) -// Protocol client. -// FIXME: Rename to just `Client`. Not done at the moment to avoid -// disrupting too much of the consumer code, should be done along -// https://github.com/filecoin-project/lotus/issues/2612. -type BlockSync struct { +// client implements exchange.Client, using the libp2p ChainExchange protocol +// as the fetching mechanism. +type client struct { // Connection manager used to contact the server. // FIXME: We should have a reduced interface here, initialized // just with our protocol ID, we shouldn't be able to open *any* @@ -35,11 +35,12 @@ type BlockSync struct { peerTracker *bsPeerTracker } -func NewClient( - host host.Host, - pmgr peermgr.MaybePeerMgr, -) *BlockSync { - return &BlockSync{ +var _ Client = (*client)(nil) + +// NewClient creates a new libp2p-based exchange.Client that uses the libp2p +// ChainExhange protocol as the fetching mechanism. +func NewClient(host host.Host, pmgr peermgr.MaybePeerMgr) Client { + return &client{ host: host, peerTracker: newPeerTracker(pmgr.Mgr), } @@ -62,11 +63,7 @@ func NewClient( // request options without disrupting external calls. In the future the // consumers should be forced to use a more standardized service and // adhere to a single API derived from this function. -func (client *BlockSync) doRequest( - ctx context.Context, - req *Request, - singlePeer *peer.ID, -) (*validatedResponse, error) { +func (c *client) doRequest(ctx context.Context, req *Request, singlePeer *peer.ID) (*validatedResponse, error) { // Validate request. if req.Length == 0 { return nil, xerrors.Errorf("invalid request of length 0") @@ -86,7 +83,7 @@ func (client *BlockSync) doRequest( if singlePeer != nil { peers = []peer.ID{*singlePeer} } else { - peers = client.getShuffledPeers() + peers = c.getShuffledPeers() if len(peers) == 0 { return nil, xerrors.Errorf("no peers available") } @@ -107,9 +104,9 @@ func (client *BlockSync) doRequest( } // Send request, read response. - res, err := client.sendRequestToPeer(ctx, peer, req) + res, err := c.sendRequestToPeer(ctx, peer, req) if err != nil { - if !xerrors.Is(err, inet.ErrNoConn) { + if !xerrors.Is(err, network.ErrNoConn) { log.Warnf("could not connect to peer %s: %s", peer.String(), err) } @@ -117,15 +114,15 @@ func (client *BlockSync) doRequest( } // Process and validate response. - validRes, err := client.processResponse(req, res) + validRes, err := c.processResponse(req, res) if err != nil { log.Warnf("processing peer %s response failed: %s", peer.String(), err) continue } - client.peerTracker.logGlobalSuccess(build.Clock.Since(globalTime)) - client.host.ConnManager().TagPeer(peer, "bsync", SUCCESS_PEER_TAG_VALUE) + c.peerTracker.logGlobalSuccess(build.Clock.Since(globalTime)) + c.host.ConnManager().TagPeer(peer, "bsync", SuccessPeerTagValue) return validRes, nil } @@ -144,11 +141,8 @@ func (client *BlockSync) doRequest( // We are conflating in the single error returned both status and validation // errors. Peer penalization should happen here then, before returning, so // we can apply the correct penalties depending on the cause of the error. -func (client *BlockSync) processResponse( - req *Request, - res *Response, - // FIXME: Add the `peer` as argument once we implement penalties. -) (*validatedResponse, error) { +// FIXME: Add the `peer` as argument once we implement penalties. +func (c *client) processResponse(req *Request, res *Response) (*validatedResponse, error) { err := res.statusToError() if err != nil { return nil, xerrors.Errorf("status error: %s", err) @@ -246,16 +240,8 @@ func (client *BlockSync) processResponse( return validRes, nil } -// GetBlocks fetches count blocks from the network, from the provided tipset -// *backwards*, returning as many tipsets as count. -// -// {hint/usage}: This is used by the Syncer during normal chain syncing and when -// resolving forks. -func (client *BlockSync) GetBlocks( - ctx context.Context, - tsk types.TipSetKey, - count int, -) ([]*types.TipSet, error) { +// GetBlocks implements Client.GetBlocks(). Refer to the godocs there. +func (c *client) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks") defer span.End() if span.IsRecordingEvents() { @@ -271,7 +257,7 @@ func (client *BlockSync) GetBlocks( Options: Headers, } - validRes, err := client.doRequest(ctx, req, nil) + validRes, err := c.doRequest(ctx, req, nil) if err != nil { return nil, err } @@ -279,11 +265,8 @@ func (client *BlockSync) GetBlocks( return validRes.tipsets, nil } -func (client *BlockSync) GetFullTipSet( - ctx context.Context, - peer peer.ID, - tsk types.TipSetKey, -) (*store.FullTipSet, error) { +// GetFullTipSet implements Client.GetFullTipSet(). Refer to the godocs there. +func (c *client) GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) { // TODO: round robin through these peers on error req := &Request{ @@ -292,7 +275,7 @@ func (client *BlockSync) GetFullTipSet( Options: Headers | Messages, } - validRes, err := client.doRequest(ctx, req, &peer) + validRes, err := c.doRequest(ctx, req, &peer) if err != nil { return nil, err } @@ -302,11 +285,8 @@ func (client *BlockSync) GetFullTipSet( // *one* tipset here, so it's safe to index directly. } -func (client *BlockSync) GetChainMessages( - ctx context.Context, - head *types.TipSet, - length uint64, -) ([]*CompactedMessages, error) { +// GetChainMessages implements Client.GetChainMessages(). Refer to the godocs there. +func (c *client) GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error) { ctx, span := trace.StartSpan(ctx, "GetChainMessages") if span.IsRecordingEvents() { span.AddAttributes( @@ -322,7 +302,7 @@ func (client *BlockSync) GetChainMessages( Options: Messages, } - validRes, err := client.doRequest(ctx, req, nil) + validRes, err := c.doRequest(ctx, req, nil) if err != nil { return nil, err } @@ -333,11 +313,7 @@ func (client *BlockSync) GetChainMessages( // Send a request to a peer. Write request in the stream and read the // response back. We do not do any processing of the request/response // here. -func (client *BlockSync) sendRequestToPeer( - ctx context.Context, - peer peer.ID, - req *Request, -) (_ *Response, err error) { +func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Request) (_ *Response, err error) { // Trace code. ctx, span := trace.StartSpan(ctx, "sendRequestToPeer") defer span.End() @@ -358,7 +334,7 @@ func (client *BlockSync) sendRequestToPeer( }() // -- TRACE -- - supported, err := client.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID) + supported, err := c.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID) if err != nil { return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) } @@ -372,20 +348,20 @@ func (client *BlockSync) sendRequestToPeer( connectionStart := build.Clock.Now() // Open stream to peer. - stream, err := client.host.NewStream( - inet.WithNoDial(ctx, "should already have connection"), + stream, err := c.host.NewStream( + network.WithNoDial(ctx, "should already have connection"), peer, BlockSyncProtocolID) if err != nil { - client.RemovePeer(peer) + c.RemovePeer(peer) return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } // Write request. - _ = stream.SetWriteDeadline(time.Now().Add(WRITE_REQ_DEADLINE)) + _ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline)) if err := cborutil.WriteCborRPC(stream, req); err != nil { _ = stream.SetWriteDeadline(time.Time{}) - client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) + c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) // FIXME: Should we also remove peer here? return nil, err } @@ -395,10 +371,10 @@ func (client *BlockSync) sendRequestToPeer( // Read response. var res Response err = cborutil.ReadCborRPC( - bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)), + bufio.NewReader(incrt.New(stream, ReadResMinSpeed, ReadResDeadline)), &res) if err != nil { - client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) + c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) return nil, xerrors.Errorf("failed to read blocksync response: %w", err) } @@ -412,32 +388,34 @@ func (client *BlockSync) sendRequestToPeer( ) } - client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart)) + c.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart)) // FIXME: We should really log a success only after we validate the response. // It might be a bit hard to do. return &res, nil } -func (client *BlockSync) AddPeer(p peer.ID) { - client.peerTracker.addPeer(p) +// AddPeer implements Client.AddPeer(). Refer to the godocs there. +func (c *client) AddPeer(p peer.ID) { + c.peerTracker.addPeer(p) } -func (client *BlockSync) RemovePeer(p peer.ID) { - client.peerTracker.removePeer(p) +// RemovePeer implements Client.RemovePeer(). Refer to the godocs there. +func (c *client) RemovePeer(p peer.ID) { + c.peerTracker.removePeer(p) } // getShuffledPeers returns a preference-sorted set of peers (by latency // and failure counting), shuffling the first few peers so we don't always // pick the same peer. // FIXME: Consider merging with `shufflePrefix()s`. -func (client *BlockSync) getShuffledPeers() []peer.ID { - peers := client.peerTracker.prefSortedPeers() +func (c *client) getShuffledPeers() []peer.ID { + peers := c.peerTracker.prefSortedPeers() shufflePrefix(peers) return peers } func shufflePrefix(peers []peer.ID) { - prefix := SHUFFLE_PEERS_PREFIX + prefix := ShufflePeersPrefix if len(peers) < prefix { prefix = len(peers) } diff --git a/chain/exchange/doc.go b/chain/exchange/doc.go new file mode 100644 index 000000000..b20ee0c1f --- /dev/null +++ b/chain/exchange/doc.go @@ -0,0 +1,19 @@ +// Package exchange contains the ChainExchange server and client components. +// +// ChainExchange is the basic chain synchronization protocol of Filecoin. +// ChainExchange is an RPC-oriented protocol, with a single operation to +// request blocks for now. +// +// A request contains a start anchor block (referred to with a CID), and a +// amount of blocks requested beyond the anchor (including the anchor itself). +// +// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports +// two options at the moment: +// +// - include block contents +// - include block messages +// +// The response will include a status code, an optional message, and the +// response payload in case of success. The payload is a slice of serialized +// tipsets. +package exchange diff --git a/chain/exchange/interfaces.go b/chain/exchange/interfaces.go new file mode 100644 index 000000000..79d8fd4b1 --- /dev/null +++ b/chain/exchange/interfaces.go @@ -0,0 +1,51 @@ +package exchange + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" +) + +// Server is the responder side of the ChainExchange protocol. It accepts +// requests from clients and services them by returning the requested +// chain data. +type Server interface { + // HandleStream is the protocol handler to be registered on a libp2p + // protocol router. + // + // In the current version of the protocol, streams are single-use. The + // server will read a single Request, and will respond with a single + // Response. It will dispose of the stream straight after. + HandleStream(stream network.Stream) +} + +// Client is the requesting side of the ChainExchange protocol. It acts as +// a proxy for other components to request chain data from peers. It is chiefly +// used by the Syncer. +type Client interface { + // GetBlocks fetches block headers from the network, from the provided + // tipset *backwards*, returning as many tipsets as the count parameter, + // or less. + GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) + + // GetChainMessages fetches messages from the network, from the provided + // tipset *backwards*, returning the messages from as many tipsets as the + // count parameter, or less. + GetChainMessages(ctx context.Context, head *types.TipSet, length uint64) ([]*CompactedMessages, error) + + // GetFullTipSet fetches a full tipset from a given peer. If successful, + // the fetched object contains block headers and all messages in full form. + GetFullTipSet(ctx context.Context, peer peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) + + // AddPeer adds a peer to the pool of peers that the Client requests + // data from. + AddPeer(peer peer.ID) + + // RemovePeer removes a peer from the pool of peers that the Client + // requests data from. + RemovePeer(peer peer.ID) +} diff --git a/chain/blocksync/peer_tracker.go b/chain/exchange/peer_tracker.go similarity index 99% rename from chain/blocksync/peer_tracker.go rename to chain/exchange/peer_tracker.go index f1f6ede07..812868cec 100644 --- a/chain/blocksync/peer_tracker.go +++ b/chain/exchange/peer_tracker.go @@ -1,4 +1,4 @@ -package blocksync +package exchange // FIXME: This needs to be reviewed. diff --git a/chain/blocksync/protocol.go b/chain/exchange/protocol.go similarity index 94% rename from chain/blocksync/protocol.go rename to chain/exchange/protocol.go index 6a2861b80..3fdf06cc0 100644 --- a/chain/blocksync/protocol.go +++ b/chain/exchange/protocol.go @@ -1,4 +1,4 @@ -package blocksync +package exchange import ( "time" @@ -25,14 +25,16 @@ const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" // qualifier to avoid "const initializer [...] is not a constant" error.) var MaxRequestLength = uint64(build.ForkLengthThreshold) -// Extracted constants from the code. -// FIXME: Should be reviewed and confirmed. -const SUCCESS_PEER_TAG_VALUE = 25 -const WRITE_REQ_DEADLINE = 5 * time.Second -const READ_RES_DEADLINE = WRITE_REQ_DEADLINE -const READ_RES_MIN_SPEED = 50 << 10 -const SHUFFLE_PEERS_PREFIX = 5 -const WRITE_RES_DEADLINE = 60 * time.Second +const ( + // Extracted constants from the code. + // FIXME: Should be reviewed and confirmed. + SuccessPeerTagValue = 25 + WriteReqDeadline = 5 * time.Second + ReadResDeadline = WriteReqDeadline + ReadResMinSpeed = 50 << 10 + ShufflePeersPrefix = 5 + WriteResDeadline = 60 * time.Second +) // FIXME: Rename. Make private. type Request struct { diff --git a/chain/blocksync/server.go b/chain/exchange/server.go similarity index 75% rename from chain/blocksync/server.go rename to chain/exchange/server.go index ffdf79ad0..c69b9f0f2 100644 --- a/chain/blocksync/server.go +++ b/chain/exchange/server.go @@ -1,4 +1,4 @@ -package blocksync +package exchange import ( "bufio" @@ -18,37 +18,24 @@ import ( inet "github.com/libp2p/go-libp2p-core/network" ) -// BlockSyncService is the component that services BlockSync requests from -// peers. -// -// BlockSync is the basic chain synchronization protocol of Filecoin. BlockSync -// is an RPC-oriented protocol, with a single operation to request blocks. -// -// A request contains a start anchor block (referred to with a CID), and a -// amount of blocks requested beyond the anchor (including the anchor itself). -// -// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports -// two options at the moment: -// -// - include block contents -// - include block messages -// -// The response will include a status code, an optional message, and the -// response payload in case of success. The payload is a slice of serialized -// tipsets. -// FIXME: Rename to just `Server` (will be done later, see note on `BlockSync`). -type BlockSyncService struct { +// server implements exchange.Server. It services requests for the +// libp2p ChainExchange protocol. +type server struct { cs *store.ChainStore } -func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { - return &BlockSyncService{ +var _ Server = (*server)(nil) + +// NewServer creates a new libp2p-based exchange.Server. It services requests +// for the libp2p ChainExchange protocol. +func NewServer(cs *store.ChainStore) Server { + return &server{ cs: cs, } } -// Entry point of the service, handles `Request`s. -func (server *BlockSyncService) HandleStream(stream inet.Stream) { +// HandleStream implements Server.HandleStream. Refer to the godocs there. +func (s *server) HandleStream(stream inet.Stream) { ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream") defer span.End() @@ -62,13 +49,13 @@ func (server *BlockSyncService) HandleStream(stream inet.Stream) { log.Infow("block sync request", "start", req.Head, "len", req.Length) - resp, err := server.processRequest(ctx, &req) + resp, err := s.processRequest(ctx, &req) if err != nil { log.Warn("failed to process request: ", err) return } - _ = stream.SetDeadline(time.Now().Add(WRITE_RES_DEADLINE)) + _ = stream.SetDeadline(time.Now().Add(WriteResDeadline)) if err := cborutil.WriteCborRPC(stream, resp); err != nil { _ = stream.SetDeadline(time.Time{}) log.Warnw("failed to write back response for handle stream", @@ -80,10 +67,7 @@ func (server *BlockSyncService) HandleStream(stream inet.Stream) { // Validate and service the request. We return either a protocol // response or an internal error. -func (server *BlockSyncService) processRequest( - ctx context.Context, - req *Request, -) (*Response, error) { +func (s *server) processRequest(ctx context.Context, req *Request) (*Response, error) { validReq, errResponse := validateRequest(ctx, req) if errResponse != nil { // The request did not pass validation, return the response @@ -91,16 +75,13 @@ func (server *BlockSyncService) processRequest( return errResponse, nil } - return server.serviceRequest(ctx, validReq) + return s.serviceRequest(ctx, validReq) } // Validate request. We either return a `validatedRequest`, or an error // `Response` indicating why we can't process it. We do not return any // internal errors here, we just signal protocol ones. -func validateRequest( - ctx context.Context, - req *Request, -) (*validatedRequest, *Response) { +func validateRequest(ctx context.Context, req *Request) (*validatedRequest, *Response) { _, span := trace.StartSpan(ctx, "blocksync.ValidateRequest") defer span.End() @@ -147,14 +128,11 @@ func validateRequest( return &validReq, nil } -func (server *BlockSyncService) serviceRequest( - ctx context.Context, - req *validatedRequest, -) (*Response, error) { +func (s *server) serviceRequest(ctx context.Context, req *validatedRequest) (*Response, error) { _, span := trace.StartSpan(ctx, "blocksync.ServiceRequest") defer span.End() - chain, err := collectChainSegment(server.cs, req) + chain, err := collectChainSegment(s.cs, req) if err != nil { log.Warn("block sync request: collectChainSegment failed: ", err) return &Response{ @@ -174,10 +152,7 @@ func (server *BlockSyncService) serviceRequest( }, nil } -func collectChainSegment( - cs *store.ChainStore, - req *validatedRequest, -) ([]*BSTipSet, error) { +func collectChainSegment(cs *store.ChainStore, req *validatedRequest) ([]*BSTipSet, error) { var bstips []*BSTipSet cur := req.head diff --git a/chain/sync.go b/chain/sync.go index 1b1cbdde9..06789e14a 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -36,7 +36,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/beacon" - "github.com/filecoin-project/lotus/chain/blocksync" + "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/stmgr" @@ -50,7 +50,7 @@ import ( ) // Blocks that are more than MaxHeightDrift epochs above -//the theoretical max height based on systime are quickly rejected +// the theoretical max height based on systime are quickly rejected const MaxHeightDrift = 5 var defaultMessageFetchWindowSize = 200 @@ -87,7 +87,7 @@ var LocalIncoming = "incoming" // The Syncer does not run workers itself. It's mainly concerned with // ensuring a consistent state of chain consensus. The reactive and network- // interfacing processes are part of other components, such as the SyncManager -// (which owns the sync scheduler and sync workers), BlockSync, the HELLO +// (which owns the sync scheduler and sync workers), client, the HELLO // protocol, and the gossipsub block propagation layer. // // {hint/concept} The fork-choice rule as it currently stands is: "pick the @@ -110,7 +110,7 @@ type Syncer struct { bad *BadBlockCache // handle to the block sync service - Bsync *blocksync.BlockSync + Exchange exchange.Client self peer.ID @@ -128,7 +128,7 @@ type Syncer struct { } // NewSyncer creates a new Syncer object. -func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) { +func NewSyncer(sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) { gen, err := sm.ChainStore().GetGenesis() if err != nil { return nil, xerrors.Errorf("getting genesis block: %w", err) @@ -143,7 +143,7 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connm beacon: beacon, bad: NewBadBlockCache(), Genesis: gent, - Bsync: bsync, + Exchange: exchange, store: sm.ChainStore(), sm: sm, self: self, @@ -220,7 +220,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool { return false } - syncer.Bsync.AddPeer(from) + syncer.Exchange.AddPeer(from) bestPweight := syncer.store.GetHeaviestTipSet().ParentWeight() targetWeight := fts.TipSet().ParentWeight() @@ -451,7 +451,7 @@ func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cid.Cid) (cid.Cid, e } // FetchTipSet tries to load the provided tipset from the store, and falls back -// to the network (BlockSync) by querying the supplied peer if not found +// to the network (client) by querying the supplied peer if not found // locally. // // {hint/usage} This is used from the HELLO protocol, to fetch the greeting @@ -462,7 +462,7 @@ func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, tsk types.TipS } // fall back to the network. - return syncer.Bsync.GetFullTipSet(ctx, p, tsk) + return syncer.Exchange.GetFullTipSet(ctx, p, tsk) } // tryLoadFullTipSet queries the tipset in the ChainStore, and returns a full @@ -1164,7 +1164,7 @@ func extractSyncState(ctx context.Context) *SyncerState { // total equality of the BeaconEntries in each block. // 3. Traverse the chain backwards, for each tipset: // 3a. Load it from the chainstore; if found, it move on to its parent. -// 3b. Query our peers via BlockSync in batches, requesting up to a +// 3b. Query our peers via client in batches, requesting up to a // maximum of 500 tipsets every time. // // Once we've concluded, if we find a mismatching tipset at the height where the @@ -1265,7 +1265,7 @@ loop: if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window { window = gap } - blks, err := syncer.Bsync.GetBlocks(ctx, at, window) + blks, err := syncer.Exchange.GetBlocks(ctx, at, window) if err != nil { // Most likely our peers aren't fully synced yet, but forwarded // new block message (ideally we'd find better peers) @@ -1283,7 +1283,7 @@ loop: // have. Since we fetch from the head backwards our reassembled chain // is sorted in reverse here: we have a child -> parent order, our last // tipset then should be child of the first tipset retrieved. - // FIXME: The reassembly logic should be part of the `BlockSync` + // FIXME: The reassembly logic should be part of the `client` // service, the consumer should not be concerned with the // `MaxRequestLength` limitation, it should just be able to request // an segment of arbitrary length. The same burden is put on @@ -1357,7 +1357,7 @@ var ErrForkTooLong = fmt.Errorf("fork longer than threshold") // denylist. Else, we find the common ancestor, and add the missing chain // fragment until the fork point to the returned []TipSet. func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { - tips, err := syncer.Bsync.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold)) + tips, err := syncer.Exchange.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold)) if err != nil { return nil, err } @@ -1438,12 +1438,12 @@ mainLoop: nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index - var bstout []*blocksync.CompactedMessages + var bstout []*exchange.CompactedMessages for len(bstout) < batchSize { next := headers[nextI] nreq := batchSize - len(bstout) - bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(nreq)) + bstips, err := syncer.Exchange.GetChainMessages(ctx, next, uint64(nreq)) if err != nil { // TODO check errors for temporary nature if windowSize > 1 { @@ -1488,8 +1488,8 @@ mainLoop: if i >= windowSize { newWindowSize := windowSize + 10 - if newWindowSize > int(blocksync.MaxRequestLength) { - newWindowSize = int(blocksync.MaxRequestLength) + if newWindowSize > int(exchange.MaxRequestLength) { + newWindowSize = int(exchange.MaxRequestLength) } if newWindowSize > windowSize { windowSize = newWindowSize @@ -1506,7 +1506,7 @@ mainLoop: return nil } -func persistMessages(bs bstore.Blockstore, bst *blocksync.CompactedMessages) error { +func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error { for _, m := range bst.Bls { //log.Infof("putting BLS message: %s", m.Cid()) if _, err := store.PutMessage(bs, m); err != nil { diff --git a/gen/main.go b/gen/main.go index e062f6a2e..1a9894a7e 100644 --- a/gen/main.go +++ b/gen/main.go @@ -7,7 +7,7 @@ import ( gen "github.com/whyrusleeping/cbor-gen" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/blocksync" + "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/paychmgr" @@ -64,10 +64,10 @@ func main() { } err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync", - blocksync.Request{}, - blocksync.Response{}, - blocksync.CompactedMessages{}, - blocksync.BSTipSet{}, + exchange.Request{}, + exchange.Response{}, + exchange.CompactedMessages{}, + exchange.BSTipSet{}, ) if err != nil { fmt.Println(err) diff --git a/node/builder.go b/node/builder.go index 5b6966cd4..3749059c5 100644 --- a/node/builder.go +++ b/node/builder.go @@ -29,7 +29,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" - "github.com/filecoin-project/lotus/chain/blocksync" + "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/market" @@ -243,7 +243,7 @@ func Online() Option { // Filecoin services Override(new(*chain.Syncer), modules.NewSyncer), - Override(new(*blocksync.BlockSync), blocksync.NewClient), + Override(new(exchange.Client), exchange.NewClient), Override(new(*messagepool.MessagePool), modules.MessagePool), Override(new(modules.Genesis), modules.ErrorGenesis), @@ -252,7 +252,7 @@ func Online() Option { Override(new(dtypes.NetworkName), modules.NetworkName), Override(new(*hello.Service), hello.NewHelloService), - Override(new(*blocksync.BlockSyncService), blocksync.NewBlockSyncService), + Override(new(exchange.Server), exchange.NewServer), Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), Override(new(dtypes.Graphsync), modules.Graphsync), diff --git a/node/modules/chain.go b/node/modules/chain.go index ea04945ef..36257d055 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -20,7 +20,7 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" - "github.com/filecoin-project/lotus/chain/blocksync" + "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" @@ -163,8 +163,8 @@ func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, return netName, err } -func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, bsync *blocksync.BlockSync, h host.Host, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*chain.Syncer, error) { - syncer, err := chain.NewSyncer(sm, bsync, h.ConnManager(), h.ID(), beacon, verifier) +func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, exchange exchange.Client, h host.Host, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*chain.Syncer, error) { + syncer, err := chain.NewSyncer(sm, exchange, h.ConnManager(), h.ID(), beacon, verifier) if err != nil { return nil, err } diff --git a/node/modules/services.go b/node/modules/services.go index fc7486abe..c6889d8ba 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -17,7 +17,7 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/beacon/drand" - "github.com/filecoin-project/lotus/chain/blocksync" + "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" @@ -69,8 +69,8 @@ func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) go pmgr.Run(helpers.LifecycleCtx(mctx, lc)) } -func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) { - h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream) +func RunBlockSync(h host.Host, svc exchange.Server) { + h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream) } func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { From 453e826a0fdb43bad94d8ad6d9fa00bbb5a202b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 7 Sep 2020 19:45:34 +0100 Subject: [PATCH 2/3] rename p2p protocol to /fil/chain/xchg/0.0.1 (backwards-compatible); rename more. --- chain/exchange/client.go | 14 ++++++-------- chain/exchange/protocol.go | 14 +++++++++++--- chain/exchange/server.go | 6 +++--- chain/sync.go | 2 +- cli/log.go | 2 +- gen/main.go | 2 +- node/builder.go | 4 ++-- node/modules/services.go | 5 +++-- 8 files changed, 28 insertions(+), 21 deletions(-) diff --git a/chain/exchange/client.go b/chain/exchange/client.go index 2133b7805..76b93fb6c 100644 --- a/chain/exchange/client.go +++ b/chain/exchange/client.go @@ -334,15 +334,13 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque }() // -- TRACE -- - supported, err := c.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID) + supported, err := c.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID, ChainExchangeProtocolID) if err != nil { return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) } - if len(supported) == 0 || supported[0] != BlockSyncProtocolID { - return nil, xerrors.Errorf("peer %s does not support protocol %s", - peer, BlockSyncProtocolID) - // FIXME: `ProtoBook` should support a *single* protocol check that returns - // a bool instead of a list. + if len(supported) == 0 || (supported[0] != BlockSyncProtocolID && supported[0] != ChainExchangeProtocolID) { + return nil, xerrors.Errorf("peer %s does not support protocols %s", + peer, []string{BlockSyncProtocolID, ChainExchangeProtocolID}) } connectionStart := build.Clock.Now() @@ -351,7 +349,7 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque stream, err := c.host.NewStream( network.WithNoDial(ctx, "should already have connection"), peer, - BlockSyncProtocolID) + ChainExchangeProtocolID, BlockSyncProtocolID) if err != nil { c.RemovePeer(peer) return nil, xerrors.Errorf("failed to open stream to peer: %w", err) @@ -375,7 +373,7 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque &res) if err != nil { c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) - return nil, xerrors.Errorf("failed to read blocksync response: %w", err) + return nil, xerrors.Errorf("failed to read chainxchg response: %w", err) } // FIXME: Move all this together at the top using a defer as done elsewhere. diff --git a/chain/exchange/protocol.go b/chain/exchange/protocol.go index 3fdf06cc0..ca4b61836 100644 --- a/chain/exchange/protocol.go +++ b/chain/exchange/protocol.go @@ -13,9 +13,17 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -var log = logging.Logger("blocksync") +var log = logging.Logger("chainxchg") -const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" +const ( + // BlockSyncProtocolID is the protocol ID of the former blocksync protocol. + // Deprecated. + BlockSyncProtocolID = "/fil/sync/blk/0.0.1" + + // ChainExchangeProtocolID is the protocol ID of the chain exchange + // protocol. + ChainExchangeProtocolID = "/fil/chain/xchg/0.0.1" +) // FIXME: Bumped from original 800 to this to accommodate `syncFork()` // use of `GetBlocks()`. It seems the expectation of that API is to @@ -119,7 +127,7 @@ func (res *Response) statusToError() error { case NotFound: return xerrors.Errorf("not found") case GoAway: - return xerrors.Errorf("not handling 'go away' blocksync responses yet") + return xerrors.Errorf("not handling 'go away' chainxchg responses yet") case InternalError: return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage) case BadRequest: diff --git a/chain/exchange/server.go b/chain/exchange/server.go index c69b9f0f2..54e169b3f 100644 --- a/chain/exchange/server.go +++ b/chain/exchange/server.go @@ -36,7 +36,7 @@ func NewServer(cs *store.ChainStore) Server { // HandleStream implements Server.HandleStream. Refer to the godocs there. func (s *server) HandleStream(stream inet.Stream) { - ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream") + ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream") defer span.End() defer stream.Close() //nolint:errcheck @@ -82,7 +82,7 @@ func (s *server) processRequest(ctx context.Context, req *Request) (*Response, e // `Response` indicating why we can't process it. We do not return any // internal errors here, we just signal protocol ones. func validateRequest(ctx context.Context, req *Request) (*validatedRequest, *Response) { - _, span := trace.StartSpan(ctx, "blocksync.ValidateRequest") + _, span := trace.StartSpan(ctx, "chainxchg.ValidateRequest") defer span.End() validReq := validatedRequest{} @@ -129,7 +129,7 @@ func validateRequest(ctx context.Context, req *Request) (*validatedRequest, *Res } func (s *server) serviceRequest(ctx context.Context, req *validatedRequest) (*Response, error) { - _, span := trace.StartSpan(ctx, "blocksync.ServiceRequest") + _, span := trace.StartSpan(ctx, "chainxchg.ServiceRequest") defer span.End() chain, err := collectChainSegment(s.cs, req) diff --git a/chain/sync.go b/chain/sync.go index 06789e14a..f980531fc 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -87,7 +87,7 @@ var LocalIncoming = "incoming" // The Syncer does not run workers itself. It's mainly concerned with // ensuring a consistent state of chain consensus. The reactive and network- // interfacing processes are part of other components, such as the SyncManager -// (which owns the sync scheduler and sync workers), client, the HELLO +// (which owns the sync scheduler and sync workers), ChainExchange, the HELLO // protocol, and the gossipsub block propagation layer. // // {hint/concept} The fork-choice rule as it currently stands is: "pick the diff --git a/cli/log.go b/cli/log.go index b551b5645..ed624eb8d 100644 --- a/cli/log.go +++ b/cli/log.go @@ -49,7 +49,7 @@ var logSetLevel = &cli.Command{ The system flag can be specified multiple times. - eg) log set-level --system chain --system blocksync debug + eg) log set-level --system chain --system chainxchg debug Available Levels: debug diff --git a/gen/main.go b/gen/main.go index 1a9894a7e..e7586a92a 100644 --- a/gen/main.go +++ b/gen/main.go @@ -63,7 +63,7 @@ func main() { os.Exit(1) } - err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync", + err = gen.WriteTupleEncodersToFile("./chain/exchange/cbor_gen.go", "exchange", exchange.Request{}, exchange.Response{}, exchange.CompactedMessages{}, diff --git a/node/builder.go b/node/builder.go index 3749059c5..6ccab21de 100644 --- a/node/builder.go +++ b/node/builder.go @@ -103,7 +103,7 @@ const ( SetGenesisKey RunHelloKey - RunBlockSyncKey + RunChainExchangeKey RunChainGraphsync RunPeerMgrKey @@ -259,7 +259,7 @@ func Online() Option { Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(RunHelloKey, modules.RunHello), - Override(RunBlockSyncKey, modules.RunBlockSync), + Override(RunChainExchangeKey, modules.RunChainExchange), Override(RunPeerMgrKey, modules.RunPeerMgr), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), diff --git a/node/modules/services.go b/node/modules/services.go index c6889d8ba..b54a14bb1 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -69,8 +69,9 @@ func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) go pmgr.Run(helpers.LifecycleCtx(mctx, lc)) } -func RunBlockSync(h host.Host, svc exchange.Server) { - h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream) +func RunChainExchange(h host.Host, svc exchange.Server) { + h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream) // old + h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new } func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { From 878ffafb51d9b9251c438231e179d00d1ebb646f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 7 Sep 2020 20:20:23 +0100 Subject: [PATCH 3/3] rename other Chain{Exchange=>Bitswap}; fix ChainBlock{s=>S}ervice(). --- node/builder.go | 4 ++-- node/modules/chain.go | 4 ++-- node/modules/dtypes/storage.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/node/builder.go b/node/builder.go index 6ccab21de..128f44bd3 100644 --- a/node/builder.go +++ b/node/builder.go @@ -238,8 +238,8 @@ func Online() Option { Override(new(dtypes.ChainGCLocker), blockstore.NewGCLocker), Override(new(dtypes.ChainGCBlockstore), modules.ChainGCBlockstore), - Override(new(dtypes.ChainExchange), modules.ChainExchange), - Override(new(dtypes.ChainBlockService), modules.ChainBlockservice), + Override(new(dtypes.ChainBitswap), modules.ChainBitswap), + Override(new(dtypes.ChainBlockService), modules.ChainBlockService), // Filecoin services Override(new(*chain.Syncer), modules.NewSyncer), diff --git a/node/modules/chain.go b/node/modules/chain.go index 36257d055..1f398d0d8 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -33,7 +33,7 @@ import ( "github.com/filecoin-project/lotus/node/repo" ) -func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainGCBlockstore) dtypes.ChainExchange { +func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainGCBlockstore) dtypes.ChainBitswap { // prefix protocol for chain bitswap // (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff) bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain")) @@ -83,7 +83,7 @@ func ChainGCBlockstore(bs dtypes.ChainBlockstore, gcl dtypes.ChainGCLocker) dtyp return blockstore.NewGCBlockstore(bs, gcl) } -func ChainBlockservice(bs dtypes.ChainBlockstore, rem dtypes.ChainExchange) dtypes.ChainBlockService { +func ChainBlockService(bs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService { return blockservice.New(bs, rem) } diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index b8c1c3081..13defda8d 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -27,7 +27,7 @@ type ChainBlockstore blockstore.Blockstore type ChainGCLocker blockstore.GCLocker type ChainGCBlockstore blockstore.GCBlockstore -type ChainExchange exchange.Interface +type ChainBitswap exchange.Interface type ChainBlockService bserv.BlockService type ClientMultiDstore *multistore.MultiStore