[wip] nitro p2p integration

This commit is contained in:
Roy Crihfield 2025-02-10 19:03:05 +08:00
parent 64d1a6d283
commit 10410ad61b
3 changed files with 201 additions and 46 deletions

View File

@ -15,7 +15,7 @@ const (
FlagAssetAddress = "asset"
)
func (s *Server) OpenChannel() *cobra.Command {
func (s *Server) OpenChannelCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "open-channel <amount>", // TODO: doc args?
Short: "Open a payment channel with the Laconic custodian",
@ -28,7 +28,7 @@ func (s *Server) OpenChannel() *cobra.Command {
// TODO
// use Coin types for eth and tokens?
// how do we map denoms to addresses?
// how do we map denoms to token addresses?
amount, ok := new(big.Int).SetString(args[1], 10)
if !ok {
return fmt.Errorf("invalid amount: %s", args[1])

151
server/nitro/p2p.go Normal file
View File

@ -0,0 +1,151 @@
package nitro
import (
"fmt"
cmtcfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/p2p/transport"
"github.com/cosmos/gogoproto/proto"
"github.com/statechannels/go-nitro/node/engine/messageservice"
"github.com/statechannels/go-nitro/protocols"
)
const (
P2PMessageChannel = byte(0x80)
// SigRequestChannel = byte(0x81)
)
type msgService struct {
incoming chan protocols.Message
outgoing chan protocols.Message
signreqs chan messageservice.SignatureRequest // dummy channel, we don't need these
}
// reactor handles incoming Nitro P2P messages via CometBFT
type reactor struct {
p2p.BaseReactor
ms *msgService
consConfig *cmtcfg.ConsensusConfig
}
type streamDesc struct {
id byte
msgType proto.Message
}
func (sd streamDesc) StreamID() byte { return sd.id }
// TODO: convert Message to protobuf
func (sd streamDesc) MessageType() proto.Message { return sd.msgType }
// MessageService
func newMessageService() *msgService {
return &msgService{
incoming: make(chan protocols.Message),
outgoing: make(chan protocols.Message),
signreqs: make(chan messageservice.SignatureRequest),
}
}
func (ms *msgService) P2PMessages() <-chan protocols.Message {
return ms.incoming
}
func (ms *msgService) SignRequests() <-chan messageservice.SignatureRequest {
// TODO: not needed, make this optional in nitro
return ms.signreqs
}
func (ms *msgService) Send(m protocols.Message) error {
ms.outgoing <- m
return nil
}
func (ms *msgService) Close() error {
close(ms.incoming)
close(ms.outgoing)
close(ms.signreqs)
return nil
}
func (ms *msgService) Id() string
// Reactor
func newReactor(ms *msgService) *reactor {
ret := &reactor{ms: ms}
ret.BaseReactor = *p2p.NewBaseReactor("nitro-reactor", ret)
return ret
}
// StreamDescriptors returns the stream descriptor for Nitro messages.
func (r *reactor) StreamDescriptors() []transport.StreamDescriptor {
return []transport.StreamDescriptor{
streamDesc{P2PMessageChannel, &messageservice.P2PMessage{}},
// streamDesc{SigRequestChannel, &messageservice.SignatureRequest{}},
}
}
// func (r *reactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
// AddPeer begins sending messages to a peer.
func (r *reactor) AddPeer(peer p2p.Peer) {
if !r.IsRunning() {
return
}
go r.streamTo(peer)
}
func (r *reactor) streamTo(peer p2p.Peer) {
logger := r.Logger.With("peer", peer)
if !peer.HasChannel(P2PMessageChannel) {
logger.Info("Peer does not implement NitroMsgChannel")
return
}
for {
if !peer.IsRunning() || !r.IsRunning() {
return
}
for {
select {
case msg := <-r.ms.outgoing:
err := peer.Send(p2p.Envelope{
Message: &msg,
ChannelID: P2PMessageChannel,
})
if err != nil {
logger.Error("Failed to send message", "msg", msg, "err", err)
return
}
default:
break
}
}
// time.Sleep(r.consConfig.PeerGossipSleepDuration)
}
}
// func (r *reactor) RemovePeer(peer p2p.Peer, reason any)
// Receive handles an envelope received from any connected peer on any registered channel.
func (r *reactor) Receive(e p2p.Envelope) {
if !r.IsRunning() {
r.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID)
return
}
switch e.ChannelID {
case P2PMessageChannel:
switch msg := e.Message.(type) {
case *protocols.Message:
r.ms.incoming <- *msg
default:
r.Logger.Error(fmt.Sprintf("Unknown message type: %T", e.Message))
}
default:
r.Logger.Error(fmt.Sprintf("Unknown channel ID: %X", e.ChannelID))
}
}

View File

@ -19,9 +19,9 @@ import (
"github.com/statechannels/go-nitro/node/engine/store"
nitrotypes "github.com/statechannels/go-nitro/types"
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/runtime/v2"
serverv2 "cosmossdk.io/server/v2"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
@ -49,21 +49,25 @@ var (
type Server struct {
*node.Node
ms *msgService
logger log.Logger
config *Config
logger log.Logger
config *Config
storeDir string // path to Nitro store directory
// path to Nitro store directory
storeDir string
EthAddress common.Address
StateChannelAddress nitrotypes.PartyAddress
ethAddress common.Address
stateChannelAddress nitrotypes.PartyAddress
}
func NewServer(logger log.Logger, cfg *Config, kr keyring.Keyring, storeDir string) (*Server, error) {
func NewServer(logger log.Logger, globalConfig server.ConfigMap, kr keyring.Keyring) (*Server, error) {
home, _ := globalConfig[serverv2.FlagHome].(string)
cfg, err := UnmarshalConfig(globalConfig)
if err != nil {
return nil, err
}
s := &Server{
storeDir: storeDir,
config: cfg,
storeDir: filepath.Join(home, "nitro"),
}
s.logger = logger.With(log.ModuleKey, s.Name())
return s, s.init(kr)
@ -80,10 +84,8 @@ func (s *Server) init(kr keyring.Keyring) error {
return err
}
} else {
// TODO for server, inject signer or callback into nitro node instead of naked privkey
// client case is simple signature
// for validator, multisignatures can be negotiated over abci txs, then loaded into signer
// Sign(message) only passes once validator's signature is prepared
// TODO for server, inject signer callback into nitro node instead of naked privkey
// multisig will be negotiated over ABCI, then funds are sent via custodian contract
s.logger.Warn("(DEV) using fake Laconic custodian Ethereum private key")
ethkey = &LaconicCustodianPrivKey
}
@ -114,20 +116,21 @@ func (s *Server) init(kr keyring.Keyring) error {
ChainUrl: c.EthUrl,
ChainStartBlockNum: c.EthStartBlock,
ChainAuthToken: c.EthAuthToken,
// TODO delegation to distsig
// TODO see above
ChainPk: hex.EncodeToString(ethkey.Bytes()),
NaAddress: common.HexToAddress(c.EthNaAddress),
VpaAddress: common.HexToAddress(c.EthVpaAddress),
CaAddress: common.HexToAddress(c.EthCaAddress),
}
// Inject SDK logger into slog, which Nitro uses
// inject SDK logger into slog, which Nitro uses
loggerImpl, ok := s.logger.Impl().(*slog.Logger)
if !ok {
s.logger.Warn("logger does not have slog implementation")
}
slog.SetDefault(loggerImpl)
node, store, _, _, err := initNode(s.logger, chainOpts, storeOpts, messageOpts, &engine.PermissivePolicy{})
s.ms = newMessageService()
node, store, _, err := initNode(s.logger, chainOpts, storeOpts, s.ms)
if err != nil {
return err
}
@ -137,8 +140,8 @@ func (s *Server) init(kr keyring.Keyring) error {
return err
}
s.Node = node
s.EthAddress = crypto.PubkeyToAddress(*ethPubkey)
s.StateChannelAddress = *store.GetAddress()
s.ethAddress = crypto.PubkeyToAddress(*ethPubkey)
s.stateChannelAddress = *store.GetAddress()
return nil
}
@ -146,31 +149,30 @@ func initNode(
logger log.Logger,
chainOpts chainservice.ChainOpts,
storeOpts store.StoreOpts,
messageOpts p2pms.MessageOpts,
policymaker engine.PolicyMaker,
// messageOpts p2pms.MessageOpts,
messageService *msgService,
) (
*node.Node,
store.Store,
*p2pms.P2PMessageService,
chainservice.ChainService,
// chainservice.ChainService,
error,
) {
ourStore, err := store.NewStore(storeOpts)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, err
}
logger.Info("Initializing message service",
"tcp port", messageOpts.TcpPort,
"web socket port", messageOpts.WsMsgPort)
messageOpts.SCAddr = *ourStore.GetAddress()
messageService := p2pms.NewMessageService(messageOpts)
// logger.Info("Initializing message service",
// "tcp port", messageOpts.TcpPort,
// "web socket port", messageOpts.WsMsgPort)
// messageOpts.SCAddr = *ourStore.GetAddress()
// messageService := p2pms.NewMessageService(messageOpts)
// Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two
// gets passed as an argument when creating NewEthChainService
storeBlockNum, err := ourStore.GetLastBlockNumSeen()
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, err
}
if storeBlockNum > chainOpts.ChainStartBlockNum {
chainOpts.ChainStartBlockNum = storeBlockNum
@ -179,17 +181,16 @@ func initNode(
logger.Info("Initializing chain service...")
ourChain, err := chainservice.NewL1ChainService(chainOpts)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, err
}
node := node.New(
messageService,
ourChain,
ourStore,
policymaker,
&engine.PermissivePolicy{},
)
return node, ourStore, messageService, ourChain, nil
return node, ourStore, nil
}
func (s *Server) Name() string {
@ -225,13 +226,16 @@ func (s *Server) StartCmdFlags() *pflag.FlagSet {
func (s *Server) CLICommands() serverv2.CLIConfig {
return serverv2.CLIConfig{
Commands: []*cobra.Command{
// command to create payment channel
// s.FundCmd(),
s.OpenChannelCmd(),
},
// Queries: []*cobra.Command{},
}
}
func (s *Server) CometReactor() *reactor {
return newReactor(s.ms)
}
func UnmarshalConfig(cfg map[string]any) (*Config, error) {
config := DefaultConfig()
if err := serverv2.UnmarshalSubConfig(cfg, serverName, config); err != nil {
@ -240,11 +244,11 @@ func UnmarshalConfig(cfg map[string]any) (*Config, error) {
return config, nil
}
func ProvideServer(logger log.Logger, globalConfig runtime.GlobalConfig, kr keyring.Keyring) (*Server, error) {
home, _ := globalConfig[serverv2.FlagHome].(string)
config, err := UnmarshalConfig(globalConfig)
if err != nil {
return nil, err
}
return NewServer(logger, config, kr, filepath.Join(home, "nitro"))
}
// func ProvideServer(logger log.Logger, globalConfig runtime.GlobalConfig, kr keyring.Keyring) (*Server, error) {
// home, _ := globalConfig[serverv2.FlagHome].(string)
// config, err := UnmarshalConfig(globalConfig)
// if err != nil {
// return nil, err
// }
// return NewServer(logger, config, kr, filepath.Join(home, "nitro"))
// }