diff --git a/api/client/client.go b/api/client/client.go index 2aaa05579..3a9f37eed 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -2,12 +2,12 @@ package client import ( "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/rpclib" + "github.com/filecoin-project/go-lotus/jsonrpc" ) // NewRPC creates a new http jsonrpc client. func NewRPC(addr string) api.API { var res api.Struct - rpclib.NewClient(addr, "Filecoin", &res.Internal) + jsonrpc.NewClient(addr, "Filecoin", &res.Internal) return &res } diff --git a/cborrpc/rpc.go b/cborrpc/rpc.go new file mode 100644 index 000000000..9aa8cd2f1 --- /dev/null +++ b/cborrpc/rpc.go @@ -0,0 +1,29 @@ +package cborrpc + +import ( + "io" + "io/ioutil" + + cbor "github.com/ipfs/go-ipld-cbor" +) + +const MessageSizeLimit = 1 << 20 + +func WriteCborRPC(w io.Writer, obj interface{}) error { + data, err := cbor.DumpObject(obj) + if err != nil { + return err + } + + _, err = w.Write(data) + return err +} + +func ReadCborRPC(r io.Reader, out interface{}) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return err + } + + return cbor.DecodeInto(b, out) +} diff --git a/daemon/rpc.go b/daemon/rpc.go index 78df124c2..b280200d2 100644 --- a/daemon/rpc.go +++ b/daemon/rpc.go @@ -4,11 +4,11 @@ import ( "net/http" "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/rpclib" + "github.com/filecoin-project/go-lotus/jsonrpc" ) func serveRPC(api api.API) error { - rpcServer := rpclib.NewServer() + rpcServer := jsonrpc.NewServer() rpcServer.Register("Filecoin", api) http.Handle("/rpc/v0", rpcServer) return http.ListenAndServe(":1234", http.DefaultServeMux) diff --git a/rpclib/rpc_client.go b/jsonrpc/rpc_client.go similarity index 99% rename from rpclib/rpc_client.go rename to jsonrpc/rpc_client.go index f8116c825..e9418040b 100644 --- a/rpclib/rpc_client.go +++ b/jsonrpc/rpc_client.go @@ -1,4 +1,4 @@ -package rpclib +package jsonrpc import ( "bytes" diff --git a/rpclib/rpc_server.go b/jsonrpc/rpc_server.go similarity index 99% rename from rpclib/rpc_server.go rename to jsonrpc/rpc_server.go index 5ef1560e7..104380861 100644 --- a/rpclib/rpc_server.go +++ b/jsonrpc/rpc_server.go @@ -1,4 +1,4 @@ -package rpclib +package jsonrpc import ( "bytes" diff --git a/rpclib/rpc_test.go b/jsonrpc/rpc_test.go similarity index 99% rename from rpclib/rpc_test.go rename to jsonrpc/rpc_test.go index 4879a3e3e..d496e25c5 100644 --- a/rpclib/rpc_test.go +++ b/jsonrpc/rpc_test.go @@ -1,4 +1,4 @@ -package rpclib +package jsonrpc import ( "context" diff --git a/node/fxlog.go b/node/fxlog.go new file mode 100644 index 000000000..3f0e18d64 --- /dev/null +++ b/node/fxlog.go @@ -0,0 +1,17 @@ +package node + +import ( + logging "github.com/ipfs/go-log" + + "go.uber.org/fx" +) + +type debugPrinter struct { + l logging.StandardLogger +} + +func (p *debugPrinter) Printf(f string, a ...interface{}) { + p.l.Debugf(f, a...) +} + +var _ fx.Printer = new(debugPrinter) diff --git a/node/hello/hello.go b/node/hello/hello.go new file mode 100644 index 000000000..865d2c316 --- /dev/null +++ b/node/hello/hello.go @@ -0,0 +1,100 @@ +package hello + +import ( + "context" + + "github.com/filecoin-project/go-lotus/cborrpc" + "github.com/libp2p/go-libp2p-core/host" + + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + logging "github.com/ipfs/go-log" + inet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +const ProtocolID = "/fil/hello/1.0.0" + +var log = logging.Logger("hello") + +func init() { + cbor.RegisterCborType(Message{}) +} + +type Message struct { + HeaviestTipSet []cid.Cid + HeaviestTipSetWeight uint64 + GenesisHash cid.Cid +} + +type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error) + +type Service struct { + newStream NewStreamFunc + + //cs *ChainStore + //syncer *Syncer +} + +func NewHelloService(h host.Host) *Service { + return &Service{ + newStream: h.NewStream, + } +} + +func (hs *Service) HandleStream(s inet.Stream) { + defer s.Close() + + var hmsg Message + if err := cborrpc.ReadCborRPC(s, &hmsg); err != nil { + log.Infow("failed to read hello message", "error", err) + return + } + log.Debugw("genesis from hello", + "tipset", hmsg.HeaviestTipSet, + "peer", s.Conn().RemotePeer(), + "hash", hmsg.GenesisHash) + + /*if hmsg.GenesisHash != hs.syncer.genesis.Cids()[0] { + log.Error("other peer has different genesis!") + s.Conn().Close() + return + } + + ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), hmsg.HeaviestTipSet) + if err != nil { + log.Errorf("failed to fetch tipset from peer during hello: %s", err) + return + } + + hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts)*/ +} + +func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { + /*s, err := hs.newStream(ctx, pid, ProtocolID) + if err != nil { + return err + } + + hts := hs.cs.GetHeaviestTipSet() + weight := hs.cs.Weight(hts) + gen, err := hs.cs.GetGenesis() + if err != nil { + return err + } + + hmsg := &Message{ + HeaviestTipSet: hts.Cids(), + HeaviestTipSetWeight: weight, + GenesisHash: gen.Cid(), + } + fmt.Println("SENDING HELLO MESSAGE: ", hts.Cids()) + fmt.Println("hello message genesis: ", gen.Cid()) + + if err := WriteCborRPC(s, hmsg); err != nil { + return err + }*/ + + return nil +} diff --git a/node/modules/core.go b/node/modules/core.go index 5bc21fcf5..070cdef99 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -1,10 +1,13 @@ package modules import ( + logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/peerstore" record "github.com/libp2p/go-libp2p-record" ) +var log = logging.Logger("modules") + // RecordValidator provides namesys compatible routing record validator func RecordValidator(ps peerstore.Peerstore) record.Validator { return record.NamespacedValidator{ diff --git a/node/modules/hello.go b/node/modules/hello.go new file mode 100644 index 000000000..aa1cdfeae --- /dev/null +++ b/node/modules/hello.go @@ -0,0 +1,25 @@ +package modules + +import ( + "github.com/filecoin-project/go-lotus/node/hello" + "github.com/filecoin-project/go-lotus/node/modules/helpers" + "github.com/libp2p/go-libp2p-core/host" + inet "github.com/libp2p/go-libp2p-core/network" + "go.uber.org/fx" +) + +func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) { + h.SetStreamHandler(hello.ProtocolID, svc.HandleStream) + + bundle := inet.NotifyBundle{ + ConnectedF: func(_ inet.Network, c inet.Conn) { + go func() { + if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), c.RemotePeer()); err != nil { + log.Warnw("failed to say hello", "error", err) + return + } + }() + }, + } + h.Network().Notify(&bundle) +}