From 453e826a0fdb43bad94d8ad6d9fa00bbb5a202b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 7 Sep 2020 19:45:34 +0100 Subject: [PATCH] rename p2p protocol to /fil/chain/xchg/0.0.1 (backwards-compatible); rename more. --- chain/exchange/client.go | 14 ++++++-------- chain/exchange/protocol.go | 14 +++++++++++--- chain/exchange/server.go | 6 +++--- chain/sync.go | 2 +- cli/log.go | 2 +- gen/main.go | 2 +- node/builder.go | 4 ++-- node/modules/services.go | 5 +++-- 8 files changed, 28 insertions(+), 21 deletions(-) diff --git a/chain/exchange/client.go b/chain/exchange/client.go index 2133b7805..76b93fb6c 100644 --- a/chain/exchange/client.go +++ b/chain/exchange/client.go @@ -334,15 +334,13 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque }() // -- TRACE -- - supported, err := c.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID) + supported, err := c.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID, ChainExchangeProtocolID) if err != nil { return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) } - if len(supported) == 0 || supported[0] != BlockSyncProtocolID { - return nil, xerrors.Errorf("peer %s does not support protocol %s", - peer, BlockSyncProtocolID) - // FIXME: `ProtoBook` should support a *single* protocol check that returns - // a bool instead of a list. + if len(supported) == 0 || (supported[0] != BlockSyncProtocolID && supported[0] != ChainExchangeProtocolID) { + return nil, xerrors.Errorf("peer %s does not support protocols %s", + peer, []string{BlockSyncProtocolID, ChainExchangeProtocolID}) } connectionStart := build.Clock.Now() @@ -351,7 +349,7 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque stream, err := c.host.NewStream( network.WithNoDial(ctx, "should already have connection"), peer, - BlockSyncProtocolID) + ChainExchangeProtocolID, BlockSyncProtocolID) if err != nil { c.RemovePeer(peer) return nil, xerrors.Errorf("failed to open stream to peer: %w", err) @@ -375,7 +373,7 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque &res) if err != nil { c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) - return nil, xerrors.Errorf("failed to read blocksync response: %w", err) + return nil, xerrors.Errorf("failed to read chainxchg response: %w", err) } // FIXME: Move all this together at the top using a defer as done elsewhere. diff --git a/chain/exchange/protocol.go b/chain/exchange/protocol.go index 3fdf06cc0..ca4b61836 100644 --- a/chain/exchange/protocol.go +++ b/chain/exchange/protocol.go @@ -13,9 +13,17 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -var log = logging.Logger("blocksync") +var log = logging.Logger("chainxchg") -const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" +const ( + // BlockSyncProtocolID is the protocol ID of the former blocksync protocol. + // Deprecated. + BlockSyncProtocolID = "/fil/sync/blk/0.0.1" + + // ChainExchangeProtocolID is the protocol ID of the chain exchange + // protocol. + ChainExchangeProtocolID = "/fil/chain/xchg/0.0.1" +) // FIXME: Bumped from original 800 to this to accommodate `syncFork()` // use of `GetBlocks()`. It seems the expectation of that API is to @@ -119,7 +127,7 @@ func (res *Response) statusToError() error { case NotFound: return xerrors.Errorf("not found") case GoAway: - return xerrors.Errorf("not handling 'go away' blocksync responses yet") + return xerrors.Errorf("not handling 'go away' chainxchg responses yet") case InternalError: return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage) case BadRequest: diff --git a/chain/exchange/server.go b/chain/exchange/server.go index c69b9f0f2..54e169b3f 100644 --- a/chain/exchange/server.go +++ b/chain/exchange/server.go @@ -36,7 +36,7 @@ func NewServer(cs *store.ChainStore) Server { // HandleStream implements Server.HandleStream. Refer to the godocs there. func (s *server) HandleStream(stream inet.Stream) { - ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream") + ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream") defer span.End() defer stream.Close() //nolint:errcheck @@ -82,7 +82,7 @@ func (s *server) processRequest(ctx context.Context, req *Request) (*Response, e // `Response` indicating why we can't process it. We do not return any // internal errors here, we just signal protocol ones. func validateRequest(ctx context.Context, req *Request) (*validatedRequest, *Response) { - _, span := trace.StartSpan(ctx, "blocksync.ValidateRequest") + _, span := trace.StartSpan(ctx, "chainxchg.ValidateRequest") defer span.End() validReq := validatedRequest{} @@ -129,7 +129,7 @@ func validateRequest(ctx context.Context, req *Request) (*validatedRequest, *Res } func (s *server) serviceRequest(ctx context.Context, req *validatedRequest) (*Response, error) { - _, span := trace.StartSpan(ctx, "blocksync.ServiceRequest") + _, span := trace.StartSpan(ctx, "chainxchg.ServiceRequest") defer span.End() chain, err := collectChainSegment(s.cs, req) diff --git a/chain/sync.go b/chain/sync.go index 06789e14a..f980531fc 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -87,7 +87,7 @@ var LocalIncoming = "incoming" // The Syncer does not run workers itself. It's mainly concerned with // ensuring a consistent state of chain consensus. The reactive and network- // interfacing processes are part of other components, such as the SyncManager -// (which owns the sync scheduler and sync workers), client, the HELLO +// (which owns the sync scheduler and sync workers), ChainExchange, the HELLO // protocol, and the gossipsub block propagation layer. // // {hint/concept} The fork-choice rule as it currently stands is: "pick the diff --git a/cli/log.go b/cli/log.go index b551b5645..ed624eb8d 100644 --- a/cli/log.go +++ b/cli/log.go @@ -49,7 +49,7 @@ var logSetLevel = &cli.Command{ The system flag can be specified multiple times. - eg) log set-level --system chain --system blocksync debug + eg) log set-level --system chain --system chainxchg debug Available Levels: debug diff --git a/gen/main.go b/gen/main.go index 1a9894a7e..e7586a92a 100644 --- a/gen/main.go +++ b/gen/main.go @@ -63,7 +63,7 @@ func main() { os.Exit(1) } - err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync", + err = gen.WriteTupleEncodersToFile("./chain/exchange/cbor_gen.go", "exchange", exchange.Request{}, exchange.Response{}, exchange.CompactedMessages{}, diff --git a/node/builder.go b/node/builder.go index 3749059c5..6ccab21de 100644 --- a/node/builder.go +++ b/node/builder.go @@ -103,7 +103,7 @@ const ( SetGenesisKey RunHelloKey - RunBlockSyncKey + RunChainExchangeKey RunChainGraphsync RunPeerMgrKey @@ -259,7 +259,7 @@ func Online() Option { Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(RunHelloKey, modules.RunHello), - Override(RunBlockSyncKey, modules.RunBlockSync), + Override(RunChainExchangeKey, modules.RunChainExchange), Override(RunPeerMgrKey, modules.RunPeerMgr), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), diff --git a/node/modules/services.go b/node/modules/services.go index c6889d8ba..b54a14bb1 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -69,8 +69,9 @@ func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) go pmgr.Run(helpers.LifecycleCtx(mctx, lc)) } -func RunBlockSync(h host.Host, svc exchange.Server) { - h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream) +func RunChainExchange(h host.Host, svc exchange.Server) { + h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream) // old + h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new } func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {