diff --git a/cmd/serve.go b/cmd/serve.go index c529d6c0..a5ccea1b 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -17,6 +17,7 @@ package cmd import ( "errors" + "math/big" "net/http" "net/url" "os" @@ -26,16 +27,22 @@ import ( "time" "github.com/cerc-io/ipld-eth-server/v5/pkg/log" - "github.com/cerc-io/ipld-eth-server/v5/pkg/payments" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" "github.com/mailgun/groupcache/v2" "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/statechannels/go-nitro/node/engine" + "github.com/statechannels/go-nitro/node/engine/chainservice" + "github.com/statechannels/go-nitro/node/engine/store" + "github.com/statechannels/go-nitro/paymentsmanager" "github.com/cerc-io/ipld-eth-server/v5/pkg/graphql" srpc "github.com/cerc-io/ipld-eth-server/v5/pkg/rpc" s "github.com/cerc-io/ipld-eth-server/v5/pkg/serve" v "github.com/cerc-io/ipld-eth-server/v5/version" + nitroNode "github.com/statechannels/go-nitro/node" + nitrop2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" ) var ErrNoRpcEndpoints = errors.New("no rpc endpoints is available") @@ -76,7 +83,24 @@ func serve() { } server.Serve(wg) - if err := startServers(server, serverConfig); err != nil { + + // TODO: Create required config for Nitro node + nitroNode, err := initializeNitroNode() + if err != nil { + panic(err) + } + + pm, err := paymentsmanager.NewPaymentsManager(nitroNode) + if err != nil { + panic(err) + } + + pm.Start(wg) + + voucherValidator := paymentsmanager.InProcessValidator{PaymentsManager: pm} + queryRates := map[string]*big.Int{"eth_getBlockByNumber": big.NewInt(50)} + + if err := startServers(server, serverConfig, voucherValidator, queryRates); err != nil { logWithCommand.Fatal(err) } graphQL, err := startEthGraphQL(server, serverConfig) @@ -96,10 +120,8 @@ func serve() { logWithCommand.Debug("state validator disabled") } - // TODO: Create required config for Nitro node - - paymentsManager, _ := payments.NewPaymentsManager(true) - paymentsManager.Start(wg) + // paymentsManager, _ := payments.NewPaymentsManager(true) + // paymentsManager.Start(wg) shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt) @@ -108,12 +130,12 @@ func serve() { graphQL.Stop() } server.Stop() - paymentsManager.Stop() + pm.Stop() wg.Wait() } -func startServers(server s.Server, settings *s.Config) error { +func startServers(server s.Server, settings *s.Config, voucherValidator paymentsmanager.VoucherValidator, queryRates map[string]*big.Int) error { if settings.IPCEnabled { logWithCommand.Debug("starting up IPC server") _, _, err := srpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs()) @@ -136,7 +158,7 @@ func startServers(server s.Server, settings *s.Config) error { if settings.HTTPEnabled { logWithCommand.Debug("starting up HTTP server") - _, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"vdb", "eth", "debug", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) + _, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"vdb", "eth", "debug", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{}, voucherValidator, queryRates) if err != nil { return err } @@ -347,3 +369,81 @@ func init() { viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled")) viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block")) } + +func initializeNitroNode() (*nitroNode.Node, error) { + // TODO: Configure + pkString := "" + useDurableStore := true + durableStoreFolder := "./data/nitro-store" + msgPort := 3005 + wsMsgPort := 5005 + chainUrl := "ws://127.0.0.1:8545" + chainStartBlock := uint64(0) + chainPk := "" + naAddress := "" + vpaAddress := "" + caAddress := "" + + chainAuthToken := "" + publicIp := "0.0.0.0" + + chainOpts := chainservice.ChainOpts{ + ChainUrl: chainUrl, + ChainStartBlock: chainStartBlock, + ChainAuthToken: chainAuthToken, + ChainPk: chainPk, + NaAddress: common.HexToAddress(naAddress), + VpaAddress: common.HexToAddress(vpaAddress), + CaAddress: common.HexToAddress(caAddress), + } + + storeOpts := store.StoreOpts{ + PkBytes: common.Hex2Bytes(pkString), + UseDurableStore: useDurableStore, + DurableStoreFolder: durableStoreFolder, + } + + peerSlice := []string{} + + messageOpts := nitrop2pms.MessageOpts{ + PkBytes: common.Hex2Bytes(pkString), + TcpPort: msgPort, + WsMsgPort: wsMsgPort, + BootPeers: peerSlice, + PublicIp: publicIp, + } + + ourStore, err := store.NewStore(storeOpts) + if err != nil { + return nil, err + } + + log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort) + messageOpts.SCAddr = *ourStore.GetAddress() + messageService := nitrop2pms.NewMessageService(messageOpts) + + // Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two + // gets passed as an argument when creating NewEthChainService + storeBlockNum, err := ourStore.GetLastBlockNumSeen() + if err != nil { + return nil, err + } + if storeBlockNum > chainOpts.ChainStartBlock { + chainOpts.ChainStartBlock = storeBlockNum + } + + log.Info("Initializing chain service...") + ourChain, err := chainservice.NewEthChainService(chainOpts) + if err != nil { + return nil, err + } + + node := nitroNode.New( + messageService, + ourChain, + ourStore, + &engine.PermissivePolicy{}, + ) + + return &node, nil +} diff --git a/go.mod b/go.mod index 966726bb..c8d5e0f6 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/ethereum/go-ethereum v1.12.0 // TODO: Investigate github.com/google/uuid v1.3.0 github.com/graph-gophers/graphql-go v1.3.0 - github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/ipfs/go-cid v0.4.1 github.com/jmoiron/sqlx v1.3.5 github.com/joho/godotenv v1.4.0 @@ -91,6 +90,7 @@ require ( github.com/hashicorp/go-bexpr v0.1.12 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect + github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.3 // indirect diff --git a/pkg/payments/payments_manager.go b/pkg/payments/payments_manager.go deleted file mode 100644 index bce32616..00000000 --- a/pkg/payments/payments_manager.go +++ /dev/null @@ -1,314 +0,0 @@ -// VulcanizeDB -// Copyright © 2023 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package payments - -import ( - "errors" - "math/big" - "sync" - "time" - - "github.com/cerc-io/ipld-eth-server/v5/pkg/log" - "github.com/ethereum/go-ethereum/common" - "github.com/hashicorp/golang-lru/v2/expirable" - "github.com/statechannels/go-nitro/node/engine" - "github.com/statechannels/go-nitro/node/engine/chainservice" - "github.com/statechannels/go-nitro/node/engine/store" - "github.com/statechannels/go-nitro/payments" - - nitroNode "github.com/statechannels/go-nitro/node" - p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" - nitroTypes "github.com/statechannels/go-nitro/types" -) - -const ( - DEFAULT_LRU_CACHE_MAX_ACCOUNTS = 1000 - DEFAULT_LRU_CACHE_ACCOUNT_TTL = 30 * 60 // 30mins - DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT = 1000 - DEFAULT_LRU_CACHE_VOUCHER_TTL = 5 * 60 // 5mins - DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS = 10000 - DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL = DEFAULT_LRU_CACHE_ACCOUNT_TTL - - DEFAULT_VOUCHER_CHECK_INTERVAL = 2 - DEFAULT_VOUCHER_CHECK_ATTEMPTS = 5 -) - -var ( - ERR_PAYMENT_NOT_RECEIVED = errors.New("Payment not received") - ERR_AMOUNT_INSUFFICIENT = errors.New("Payment amount insufficient") -) - -type Payment struct { - voucher payments.Voucher - amount *big.Int -} - -// Struct representing payments manager -// Maintains either an in-process Nitro node or communication with an out-of-process Nitro node -type PaymentsManager struct { - // Whether to run an in-process Nitro node - runInProcessNitroNode bool - - // In-process Nitro node; nil when runInProcessNitroNode is false - nitroNode *nitroNode.Node - - receivedPayments *expirable.LRU[common.Address, *expirable.LRU[common.Hash, Payment]] - - paidSoFarOnChannel *expirable.LRU[nitroTypes.Destination, *big.Int] - - // Used to signal shutdown of the service - quitChan chan bool -} - -func NewPaymentsManager(runInProcessNitroNode bool) (PaymentsManager, error) { - var err error - - pm := PaymentsManager{runInProcessNitroNode: runInProcessNitroNode} - - if runInProcessNitroNode { - pm.nitroNode, err = initializeNitroNode() - if err != nil { - return PaymentsManager{}, err - } - } - - pm.receivedPayments = expirable.NewLRU[common.Address, *expirable.LRU[common.Hash, Payment]]( - DEFAULT_LRU_CACHE_MAX_ACCOUNTS, - nil, - time.Second*DEFAULT_LRU_CACHE_ACCOUNT_TTL, - ) - - pm.paidSoFarOnChannel = expirable.NewLRU[nitroTypes.Destination, *big.Int]( - DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS, - nil, - time.Second*DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL, - ) - - // Load existing open payment channels with amount paid so far from the stored state - err = pm.loadPaymentChannels() - if err != nil { - return PaymentsManager{}, err - } - - return pm, nil -} - -func (pm *PaymentsManager) Start(wg *sync.WaitGroup) { - log.Info("starting payments manager") - - // Channel for received vouchers - var receivedVouchers <-chan payments.Voucher - if pm.runInProcessNitroNode { - receivedVouchers = pm.nitroNode.ReceivedVouchers() - } else { - // TODO: Get vouchers from an out-of-process go-nitro node - } - - wg.Add(1) - go func() { - defer wg.Done() - pm.run(receivedVouchers) - }() -} - -func (pm *PaymentsManager) Stop() error { - log.Info("stopping payments manager") - close(pm.quitChan) - return nil -} - -func (pm *PaymentsManager) AuthenticatePayment(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error { - // Check the payments map for required voucher - var isPaymentReceived, isOfSufficientValue bool - for i := 0; i < DEFAULT_VOUCHER_CHECK_ATTEMPTS; i++ { - isPaymentReceived, isOfSufficientValue = pm.acceptReceivedPayment(voucherHash, signerAddress, value) - - if isPaymentReceived { - if !isOfSufficientValue { - return ERR_AMOUNT_INSUFFICIENT - } - - return nil - } - - // Retry after an interval if voucher not found - log.Debugf("Payment from %s not found, retrying after %d sec...", signerAddress, DEFAULT_VOUCHER_CHECK_INTERVAL) - time.Sleep(DEFAULT_VOUCHER_CHECK_INTERVAL * time.Second) - } - - return ERR_PAYMENT_NOT_RECEIVED -} - -// Check for a given payment voucher in LRU cache map -// Returns whether the voucher was found, whether it was of sufficient value -func (pm *PaymentsManager) acceptReceivedPayment(voucherHash common.Hash, signerAddress common.Address, minRequiredValue *big.Int) (bool, bool) { - paymentsMap, ok := pm.receivedPayments.Get(signerAddress) - if !ok { - return false, false - } - - receivedPayment, ok := paymentsMap.Get(voucherHash) - if !ok { - return false, false - } - - if receivedPayment.amount.Cmp(minRequiredValue) < 0 { - return true, false - } - - // Delete the voucher from map after consuming it - paymentsMap.Remove(voucherHash) - return true, true -} - -func (pm *PaymentsManager) run(receivedVouchers <-chan payments.Voucher) { - log.Info("starting voucher subscription...") - for { - select { - case voucher := <-receivedVouchers: - payer, err := pm.getChannelCounterparty(voucher.ChannelId) - if err != nil { - // TODO: Handle - panic(err) - } - - paidSoFar, ok := pm.paidSoFarOnChannel.Get(voucher.ChannelId) - if !ok { - paidSoFar = big.NewInt(0) - } - - paymentAmount := big.NewInt(0).Sub(voucher.Amount, paidSoFar) - pm.paidSoFarOnChannel.Add(voucher.ChannelId, voucher.Amount) - log.Infof("Received a payment voucher of %s from %s", paymentAmount.String(), payer.String()) - - paymentsMap, ok := pm.receivedPayments.Get(payer) - if !ok { - paymentsMap = expirable.NewLRU[common.Hash, Payment]( - DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT, - nil, - DEFAULT_LRU_CACHE_VOUCHER_TTL, - ) - - pm.receivedPayments.Add(payer, paymentsMap) - } - - voucherHash, err := voucher.Hash() - if err != nil { - // TODO: Handle - panic(err) - } - - paymentsMap.Add(voucherHash, Payment{voucher: voucher, amount: paymentAmount}) - case <-pm.quitChan: - log.Info("stopping voucher subscription loop") - // TODO: stop the nitro node if that means anything - return - } - } -} - -func (pm *PaymentsManager) getChannelCounterparty(channelId nitroTypes.Destination) (common.Address, error) { - associatedPaymentChannel, err := pm.nitroNode.GetPaymentChannel(channelId) - if err != nil { - return common.Address{}, err - } - - return associatedPaymentChannel.Balance.Payer, nil -} - -func (pm *PaymentsManager) loadPaymentChannels() error { - // TODO: Implement - return nil -} - -func initializeNitroNode() (*nitroNode.Node, error) { - // TODO: Configure - pkString := "" - useDurableStore := true - durableStoreFolder := "./data/nitro-store" - msgPort := 3005 - wsMsgPort := 5005 - chainUrl := "ws://127.0.0.1:8546" - chainStartBlock := uint64(0) - chainPk := "" - naAddress := "" - vpaAddress := "" - caAddress := "" - - chainAuthToken := "" - publicIp := "0.0.0.0" - - chainOpts := chainservice.ChainOpts{ - ChainUrl: chainUrl, - ChainStartBlock: chainStartBlock, - ChainAuthToken: chainAuthToken, - ChainPk: chainPk, - NaAddress: common.HexToAddress(naAddress), - VpaAddress: common.HexToAddress(vpaAddress), - CaAddress: common.HexToAddress(caAddress), - } - - storeOpts := store.StoreOpts{ - PkBytes: common.Hex2Bytes(pkString), - UseDurableStore: useDurableStore, - DurableStoreFolder: durableStoreFolder, - } - - peerSlice := []string{} - - messageOpts := p2pms.MessageOpts{ - PkBytes: common.Hex2Bytes(pkString), - TcpPort: msgPort, - WsMsgPort: wsMsgPort, - BootPeers: peerSlice, - PublicIp: publicIp, - } - - ourStore, err := store.NewStore(storeOpts) - if err != nil { - return nil, err - } - - log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort) - messageOpts.SCAddr = *ourStore.GetAddress() - messageService := p2pms.NewMessageService(messageOpts) - - // Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two - // gets passed as an argument when creating NewEthChainService - storeBlockNum, err := ourStore.GetLastBlockNumSeen() - if err != nil { - return nil, err - } - if storeBlockNum > chainOpts.ChainStartBlock { - chainOpts.ChainStartBlock = storeBlockNum - } - - log.Info("Initializing chain service...") - ourChain, err := chainservice.NewEthChainService(chainOpts) - if err != nil { - return nil, err - } - - node := nitroNode.New( - messageService, - ourChain, - ourStore, - &engine.PermissivePolicy{}, - ) - - return &node, nil -} diff --git a/pkg/rpc/http.go b/pkg/rpc/http.go index 0dc9d780..66fd4430 100644 --- a/pkg/rpc/http.go +++ b/pkg/rpc/http.go @@ -18,27 +18,31 @@ package rpc import ( "fmt" + "math/big" "github.com/cerc-io/ipld-eth-server/v5/pkg/log" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" + "github.com/statechannels/go-nitro/paymentsmanager" "github.com/cerc-io/ipld-eth-server/v5/pkg/prom" ) // StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules. -func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (*rpc.Server, error) { - +func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts, voucherValidator paymentsmanager.VoucherValidator, queryRates map[string]*big.Int) (*rpc.Server, error) { srv := rpc.NewServer() err := node.RegisterApis(apis, modules, srv) if err != nil { utils.Fatalf("Could not register HTTP API: %w", err) } - handler := prom.HTTPMiddleware(node.NewHTTPHandlerStack(srv, cors, vhosts, nil)) + + promHandler := prom.HTTPMiddleware(node.NewHTTPHandlerStack(srv, cors, vhosts, nil)) + paymentHandler := paymentsmanager.HTTPMiddleware(promHandler, voucherValidator, queryRates) // start http server - _, addr, err := node.StartHTTPEndpoint(endpoint, rpc.DefaultHTTPTimeouts, handler) + // request -> payments -> metrics -> server + _, addr, err := node.StartHTTPEndpoint(endpoint, rpc.DefaultHTTPTimeouts, paymentHandler) if err != nil { utils.Fatalf("Could not start RPC api: %v", err) }