rename p2p protocol to /fil/chain/xchg/0.0.1 (backwards-compatible); rename more.

This commit is contained in:
Raúl Kripalani 2020-09-07 19:45:34 +01:00
parent 55b1456d45
commit 453e826a0f
8 changed files with 28 additions and 21 deletions

View File

@ -334,15 +334,13 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque
}() }()
// -- TRACE -- // -- TRACE --
supported, err := c.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID) supported, err := c.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID, ChainExchangeProtocolID)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) return nil, xerrors.Errorf("failed to get protocols for peer: %w", err)
} }
if len(supported) == 0 || supported[0] != BlockSyncProtocolID { if len(supported) == 0 || (supported[0] != BlockSyncProtocolID && supported[0] != ChainExchangeProtocolID) {
return nil, xerrors.Errorf("peer %s does not support protocol %s", return nil, xerrors.Errorf("peer %s does not support protocols %s",
peer, BlockSyncProtocolID) peer, []string{BlockSyncProtocolID, ChainExchangeProtocolID})
// FIXME: `ProtoBook` should support a *single* protocol check that returns
// a bool instead of a list.
} }
connectionStart := build.Clock.Now() connectionStart := build.Clock.Now()
@ -351,7 +349,7 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque
stream, err := c.host.NewStream( stream, err := c.host.NewStream(
network.WithNoDial(ctx, "should already have connection"), network.WithNoDial(ctx, "should already have connection"),
peer, peer,
BlockSyncProtocolID) ChainExchangeProtocolID, BlockSyncProtocolID)
if err != nil { if err != nil {
c.RemovePeer(peer) c.RemovePeer(peer)
return nil, xerrors.Errorf("failed to open stream to peer: %w", err) return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
@ -375,7 +373,7 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque
&res) &res)
if err != nil { if err != nil {
c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart))
return nil, xerrors.Errorf("failed to read blocksync response: %w", err) return nil, xerrors.Errorf("failed to read chainxchg response: %w", err)
} }
// FIXME: Move all this together at the top using a defer as done elsewhere. // FIXME: Move all this together at the top using a defer as done elsewhere.

View File

@ -13,9 +13,17 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
var log = logging.Logger("blocksync") var log = logging.Logger("chainxchg")
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" const (
// BlockSyncProtocolID is the protocol ID of the former blocksync protocol.
// Deprecated.
BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
// ChainExchangeProtocolID is the protocol ID of the chain exchange
// protocol.
ChainExchangeProtocolID = "/fil/chain/xchg/0.0.1"
)
// FIXME: Bumped from original 800 to this to accommodate `syncFork()` // FIXME: Bumped from original 800 to this to accommodate `syncFork()`
// use of `GetBlocks()`. It seems the expectation of that API is to // use of `GetBlocks()`. It seems the expectation of that API is to
@ -119,7 +127,7 @@ func (res *Response) statusToError() error {
case NotFound: case NotFound:
return xerrors.Errorf("not found") return xerrors.Errorf("not found")
case GoAway: case GoAway:
return xerrors.Errorf("not handling 'go away' blocksync responses yet") return xerrors.Errorf("not handling 'go away' chainxchg responses yet")
case InternalError: case InternalError:
return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage) return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage)
case BadRequest: case BadRequest:

View File

@ -36,7 +36,7 @@ func NewServer(cs *store.ChainStore) Server {
// HandleStream implements Server.HandleStream. Refer to the godocs there. // HandleStream implements Server.HandleStream. Refer to the godocs there.
func (s *server) HandleStream(stream inet.Stream) { func (s *server) HandleStream(stream inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream") ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream")
defer span.End() defer span.End()
defer stream.Close() //nolint:errcheck defer stream.Close() //nolint:errcheck
@ -82,7 +82,7 @@ func (s *server) processRequest(ctx context.Context, req *Request) (*Response, e
// `Response` indicating why we can't process it. We do not return any // `Response` indicating why we can't process it. We do not return any
// internal errors here, we just signal protocol ones. // internal errors here, we just signal protocol ones.
func validateRequest(ctx context.Context, req *Request) (*validatedRequest, *Response) { func validateRequest(ctx context.Context, req *Request) (*validatedRequest, *Response) {
_, span := trace.StartSpan(ctx, "blocksync.ValidateRequest") _, span := trace.StartSpan(ctx, "chainxchg.ValidateRequest")
defer span.End() defer span.End()
validReq := validatedRequest{} validReq := validatedRequest{}
@ -129,7 +129,7 @@ func validateRequest(ctx context.Context, req *Request) (*validatedRequest, *Res
} }
func (s *server) serviceRequest(ctx context.Context, req *validatedRequest) (*Response, error) { func (s *server) serviceRequest(ctx context.Context, req *validatedRequest) (*Response, error) {
_, span := trace.StartSpan(ctx, "blocksync.ServiceRequest") _, span := trace.StartSpan(ctx, "chainxchg.ServiceRequest")
defer span.End() defer span.End()
chain, err := collectChainSegment(s.cs, req) chain, err := collectChainSegment(s.cs, req)

View File

@ -87,7 +87,7 @@ var LocalIncoming = "incoming"
// The Syncer does not run workers itself. It's mainly concerned with // The Syncer does not run workers itself. It's mainly concerned with
// ensuring a consistent state of chain consensus. The reactive and network- // ensuring a consistent state of chain consensus. The reactive and network-
// interfacing processes are part of other components, such as the SyncManager // interfacing processes are part of other components, such as the SyncManager
// (which owns the sync scheduler and sync workers), client, the HELLO // (which owns the sync scheduler and sync workers), ChainExchange, the HELLO
// protocol, and the gossipsub block propagation layer. // protocol, and the gossipsub block propagation layer.
// //
// {hint/concept} The fork-choice rule as it currently stands is: "pick the // {hint/concept} The fork-choice rule as it currently stands is: "pick the

View File

@ -49,7 +49,7 @@ var logSetLevel = &cli.Command{
The system flag can be specified multiple times. The system flag can be specified multiple times.
eg) log set-level --system chain --system blocksync debug eg) log set-level --system chain --system chainxchg debug
Available Levels: Available Levels:
debug debug

View File

@ -63,7 +63,7 @@ func main() {
os.Exit(1) os.Exit(1)
} }
err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync", err = gen.WriteTupleEncodersToFile("./chain/exchange/cbor_gen.go", "exchange",
exchange.Request{}, exchange.Request{},
exchange.Response{}, exchange.Response{},
exchange.CompactedMessages{}, exchange.CompactedMessages{},

View File

@ -103,7 +103,7 @@ const (
SetGenesisKey SetGenesisKey
RunHelloKey RunHelloKey
RunBlockSyncKey RunChainExchangeKey
RunChainGraphsync RunChainGraphsync
RunPeerMgrKey RunPeerMgrKey
@ -259,7 +259,7 @@ func Online() Option {
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
Override(RunHelloKey, modules.RunHello), Override(RunHelloKey, modules.RunHello),
Override(RunBlockSyncKey, modules.RunBlockSync), Override(RunChainExchangeKey, modules.RunChainExchange),
Override(RunPeerMgrKey, modules.RunPeerMgr), Override(RunPeerMgrKey, modules.RunPeerMgr),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),

View File

@ -69,8 +69,9 @@ func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr)
go pmgr.Run(helpers.LifecycleCtx(mctx, lc)) go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
} }
func RunBlockSync(h host.Host, svc exchange.Server) { func RunChainExchange(h host.Host, svc exchange.Server) {
h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream) h.SetStreamHandler(exchange.BlockSyncProtocolID, svc.HandleStream) // old
h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
} }
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {