Integrate go-nitro node for payments #256

Merged
ashwin merged 12 commits from deep-stack/ipld-eth-server:pm-integrate-go-nitro into payments 2023-10-13 09:03:11 +00:00
4 changed files with 118 additions and 328 deletions
Showing only changes of commit e9e17c2be1 - Show all commits

View File

@ -17,6 +17,7 @@ package cmd
import (
"errors"
"math/big"
"net/http"
"net/url"
"os"
@ -26,16 +27,22 @@ import (
"time"
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
"github.com/cerc-io/ipld-eth-server/v5/pkg/payments"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/mailgun/groupcache/v2"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/statechannels/go-nitro/node/engine"
"github.com/statechannels/go-nitro/node/engine/chainservice"
"github.com/statechannels/go-nitro/node/engine/store"
"github.com/statechannels/go-nitro/paymentsmanager"
"github.com/cerc-io/ipld-eth-server/v5/pkg/graphql"
srpc "github.com/cerc-io/ipld-eth-server/v5/pkg/rpc"
s "github.com/cerc-io/ipld-eth-server/v5/pkg/serve"
v "github.com/cerc-io/ipld-eth-server/v5/version"
nitroNode "github.com/statechannels/go-nitro/node"
nitrop2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
)
var ErrNoRpcEndpoints = errors.New("no rpc endpoints is available")
@ -76,7 +83,24 @@ func serve() {
}
server.Serve(wg)
if err := startServers(server, serverConfig); err != nil {
// TODO: Create required config for Nitro node
nitroNode, err := initializeNitroNode()
if err != nil {
panic(err)
}
pm, err := paymentsmanager.NewPaymentsManager(nitroNode)
if err != nil {
panic(err)
}
pm.Start(wg)
voucherValidator := paymentsmanager.InProcessValidator{PaymentsManager: pm}
queryRates := map[string]*big.Int{"eth_getBlockByNumber": big.NewInt(50)}
if err := startServers(server, serverConfig, voucherValidator, queryRates); err != nil {
logWithCommand.Fatal(err)
}
graphQL, err := startEthGraphQL(server, serverConfig)
@ -96,10 +120,8 @@ func serve() {
logWithCommand.Debug("state validator disabled")
}
// TODO: Create required config for Nitro node
paymentsManager, _ := payments.NewPaymentsManager(true)
paymentsManager.Start(wg)
// paymentsManager, _ := payments.NewPaymentsManager(true)
// paymentsManager.Start(wg)
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt)
@ -108,12 +130,12 @@ func serve() {
graphQL.Stop()
}
server.Stop()
paymentsManager.Stop()
pm.Stop()
wg.Wait()
}
func startServers(server s.Server, settings *s.Config) error {
func startServers(server s.Server, settings *s.Config, voucherValidator paymentsmanager.VoucherValidator, queryRates map[string]*big.Int) error {
if settings.IPCEnabled {
logWithCommand.Debug("starting up IPC server")
_, _, err := srpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs())
@ -136,7 +158,7 @@ func startServers(server s.Server, settings *s.Config) error {
if settings.HTTPEnabled {
logWithCommand.Debug("starting up HTTP server")
_, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"vdb", "eth", "debug", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
_, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"vdb", "eth", "debug", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{}, voucherValidator, queryRates)
if err != nil {
return err
}
@ -347,3 +369,81 @@ func init() {
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
}
func initializeNitroNode() (*nitroNode.Node, error) {
// TODO: Configure
pkString := ""
useDurableStore := true
durableStoreFolder := "./data/nitro-store"
msgPort := 3005
wsMsgPort := 5005
chainUrl := "ws://127.0.0.1:8545"
chainStartBlock := uint64(0)
chainPk := ""
naAddress := ""
vpaAddress := ""
caAddress := ""
chainAuthToken := ""
publicIp := "0.0.0.0"
chainOpts := chainservice.ChainOpts{
ChainUrl: chainUrl,
ChainStartBlock: chainStartBlock,
ChainAuthToken: chainAuthToken,
ChainPk: chainPk,
NaAddress: common.HexToAddress(naAddress),
VpaAddress: common.HexToAddress(vpaAddress),
CaAddress: common.HexToAddress(caAddress),
}
storeOpts := store.StoreOpts{
PkBytes: common.Hex2Bytes(pkString),
UseDurableStore: useDurableStore,
DurableStoreFolder: durableStoreFolder,
}
peerSlice := []string{}
messageOpts := nitrop2pms.MessageOpts{
PkBytes: common.Hex2Bytes(pkString),
TcpPort: msgPort,
WsMsgPort: wsMsgPort,
BootPeers: peerSlice,
PublicIp: publicIp,
}
ourStore, err := store.NewStore(storeOpts)
if err != nil {
return nil, err
}
log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort)
messageOpts.SCAddr = *ourStore.GetAddress()
messageService := nitrop2pms.NewMessageService(messageOpts)
// Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two
// gets passed as an argument when creating NewEthChainService
storeBlockNum, err := ourStore.GetLastBlockNumSeen()
if err != nil {
return nil, err
}
if storeBlockNum > chainOpts.ChainStartBlock {
chainOpts.ChainStartBlock = storeBlockNum
}
log.Info("Initializing chain service...")
ourChain, err := chainservice.NewEthChainService(chainOpts)
if err != nil {
return nil, err
}
node := nitroNode.New(
messageService,
ourChain,
ourStore,
&engine.PermissivePolicy{},
)
return &node, nil
}

2
go.mod
View File

@ -11,7 +11,6 @@ require (
github.com/ethereum/go-ethereum v1.12.0 // TODO: Investigate
github.com/google/uuid v1.3.0
github.com/graph-gophers/graphql-go v1.3.0
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/ipfs/go-cid v0.4.1
github.com/jmoiron/sqlx v1.3.5
github.com/joho/godotenv v1.4.0
@ -91,6 +90,7 @@ require (
github.com/hashicorp/go-bexpr v0.1.12 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.3 // indirect

View File

@ -1,314 +0,0 @@
// VulcanizeDB
// Copyright © 2023 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package payments
import (
"errors"
"math/big"
"sync"
"time"
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
"github.com/ethereum/go-ethereum/common"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/statechannels/go-nitro/node/engine"
"github.com/statechannels/go-nitro/node/engine/chainservice"
"github.com/statechannels/go-nitro/node/engine/store"
"github.com/statechannels/go-nitro/payments"
nitroNode "github.com/statechannels/go-nitro/node"
p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
nitroTypes "github.com/statechannels/go-nitro/types"
)
const (
DEFAULT_LRU_CACHE_MAX_ACCOUNTS = 1000
DEFAULT_LRU_CACHE_ACCOUNT_TTL = 30 * 60 // 30mins
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT = 1000
DEFAULT_LRU_CACHE_VOUCHER_TTL = 5 * 60 // 5mins
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS = 10000
DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL = DEFAULT_LRU_CACHE_ACCOUNT_TTL
DEFAULT_VOUCHER_CHECK_INTERVAL = 2
DEFAULT_VOUCHER_CHECK_ATTEMPTS = 5
)
var (
ERR_PAYMENT_NOT_RECEIVED = errors.New("Payment not received")
ERR_AMOUNT_INSUFFICIENT = errors.New("Payment amount insufficient")
)
type Payment struct {
voucher payments.Voucher
amount *big.Int
}
// Struct representing payments manager
// Maintains either an in-process Nitro node or communication with an out-of-process Nitro node
type PaymentsManager struct {
// Whether to run an in-process Nitro node
runInProcessNitroNode bool
// In-process Nitro node; nil when runInProcessNitroNode is false
nitroNode *nitroNode.Node
receivedPayments *expirable.LRU[common.Address, *expirable.LRU[common.Hash, Payment]]
paidSoFarOnChannel *expirable.LRU[nitroTypes.Destination, *big.Int]
// Used to signal shutdown of the service
quitChan chan bool
}
func NewPaymentsManager(runInProcessNitroNode bool) (PaymentsManager, error) {
var err error
pm := PaymentsManager{runInProcessNitroNode: runInProcessNitroNode}
if runInProcessNitroNode {
pm.nitroNode, err = initializeNitroNode()
if err != nil {
return PaymentsManager{}, err
}
}
pm.receivedPayments = expirable.NewLRU[common.Address, *expirable.LRU[common.Hash, Payment]](
DEFAULT_LRU_CACHE_MAX_ACCOUNTS,
nil,
time.Second*DEFAULT_LRU_CACHE_ACCOUNT_TTL,
)
pm.paidSoFarOnChannel = expirable.NewLRU[nitroTypes.Destination, *big.Int](
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS,
nil,
time.Second*DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL,
)
// Load existing open payment channels with amount paid so far from the stored state
err = pm.loadPaymentChannels()
if err != nil {
return PaymentsManager{}, err
}
return pm, nil
}
func (pm *PaymentsManager) Start(wg *sync.WaitGroup) {
log.Info("starting payments manager")
// Channel for received vouchers
var receivedVouchers <-chan payments.Voucher
if pm.runInProcessNitroNode {
receivedVouchers = pm.nitroNode.ReceivedVouchers()
} else {
// TODO: Get vouchers from an out-of-process go-nitro node
}
wg.Add(1)
go func() {
defer wg.Done()
pm.run(receivedVouchers)
}()
}
func (pm *PaymentsManager) Stop() error {
log.Info("stopping payments manager")
close(pm.quitChan)
return nil
}
func (pm *PaymentsManager) AuthenticatePayment(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error {
// Check the payments map for required voucher
var isPaymentReceived, isOfSufficientValue bool
for i := 0; i < DEFAULT_VOUCHER_CHECK_ATTEMPTS; i++ {
isPaymentReceived, isOfSufficientValue = pm.acceptReceivedPayment(voucherHash, signerAddress, value)
if isPaymentReceived {
if !isOfSufficientValue {
return ERR_AMOUNT_INSUFFICIENT
}
return nil
}
// Retry after an interval if voucher not found
log.Debugf("Payment from %s not found, retrying after %d sec...", signerAddress, DEFAULT_VOUCHER_CHECK_INTERVAL)
time.Sleep(DEFAULT_VOUCHER_CHECK_INTERVAL * time.Second)
}
return ERR_PAYMENT_NOT_RECEIVED
}
// Check for a given payment voucher in LRU cache map
// Returns whether the voucher was found, whether it was of sufficient value
func (pm *PaymentsManager) acceptReceivedPayment(voucherHash common.Hash, signerAddress common.Address, minRequiredValue *big.Int) (bool, bool) {
paymentsMap, ok := pm.receivedPayments.Get(signerAddress)
if !ok {
return false, false
}
receivedPayment, ok := paymentsMap.Get(voucherHash)
if !ok {
return false, false
}
if receivedPayment.amount.Cmp(minRequiredValue) < 0 {
return true, false
}
// Delete the voucher from map after consuming it
paymentsMap.Remove(voucherHash)
return true, true
}
func (pm *PaymentsManager) run(receivedVouchers <-chan payments.Voucher) {
log.Info("starting voucher subscription...")
for {
select {
case voucher := <-receivedVouchers:
payer, err := pm.getChannelCounterparty(voucher.ChannelId)
if err != nil {
// TODO: Handle
panic(err)
}
paidSoFar, ok := pm.paidSoFarOnChannel.Get(voucher.ChannelId)
if !ok {
paidSoFar = big.NewInt(0)
}
paymentAmount := big.NewInt(0).Sub(voucher.Amount, paidSoFar)
pm.paidSoFarOnChannel.Add(voucher.ChannelId, voucher.Amount)
log.Infof("Received a payment voucher of %s from %s", paymentAmount.String(), payer.String())
paymentsMap, ok := pm.receivedPayments.Get(payer)
if !ok {
paymentsMap = expirable.NewLRU[common.Hash, Payment](
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT,
nil,
DEFAULT_LRU_CACHE_VOUCHER_TTL,
)
pm.receivedPayments.Add(payer, paymentsMap)
}
voucherHash, err := voucher.Hash()
if err != nil {
// TODO: Handle
panic(err)
}
paymentsMap.Add(voucherHash, Payment{voucher: voucher, amount: paymentAmount})
case <-pm.quitChan:
log.Info("stopping voucher subscription loop")
// TODO: stop the nitro node if that means anything
return
}
}
}
func (pm *PaymentsManager) getChannelCounterparty(channelId nitroTypes.Destination) (common.Address, error) {
associatedPaymentChannel, err := pm.nitroNode.GetPaymentChannel(channelId)
if err != nil {
return common.Address{}, err
}
return associatedPaymentChannel.Balance.Payer, nil
}
func (pm *PaymentsManager) loadPaymentChannels() error {
// TODO: Implement
return nil
}
func initializeNitroNode() (*nitroNode.Node, error) {
// TODO: Configure
pkString := ""
useDurableStore := true
durableStoreFolder := "./data/nitro-store"
msgPort := 3005
wsMsgPort := 5005
chainUrl := "ws://127.0.0.1:8546"
chainStartBlock := uint64(0)
chainPk := ""
naAddress := ""
vpaAddress := ""
caAddress := ""
chainAuthToken := ""
publicIp := "0.0.0.0"
chainOpts := chainservice.ChainOpts{
ChainUrl: chainUrl,
ChainStartBlock: chainStartBlock,
ChainAuthToken: chainAuthToken,
ChainPk: chainPk,
NaAddress: common.HexToAddress(naAddress),
VpaAddress: common.HexToAddress(vpaAddress),
CaAddress: common.HexToAddress(caAddress),
}
storeOpts := store.StoreOpts{
PkBytes: common.Hex2Bytes(pkString),
UseDurableStore: useDurableStore,
DurableStoreFolder: durableStoreFolder,
}
peerSlice := []string{}
messageOpts := p2pms.MessageOpts{
PkBytes: common.Hex2Bytes(pkString),
TcpPort: msgPort,
WsMsgPort: wsMsgPort,
BootPeers: peerSlice,
PublicIp: publicIp,
}
ourStore, err := store.NewStore(storeOpts)
if err != nil {
return nil, err
}
log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort)
messageOpts.SCAddr = *ourStore.GetAddress()
messageService := p2pms.NewMessageService(messageOpts)
// Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two
// gets passed as an argument when creating NewEthChainService
storeBlockNum, err := ourStore.GetLastBlockNumSeen()
if err != nil {
return nil, err
}
if storeBlockNum > chainOpts.ChainStartBlock {
chainOpts.ChainStartBlock = storeBlockNum
}
log.Info("Initializing chain service...")
ourChain, err := chainservice.NewEthChainService(chainOpts)
if err != nil {
return nil, err
}
node := nitroNode.New(
messageService,
ourChain,
ourStore,
&engine.PermissivePolicy{},
)
return &node, nil
}

View File

@ -18,27 +18,31 @@ package rpc
import (
"fmt"
"math/big"
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/statechannels/go-nitro/paymentsmanager"
"github.com/cerc-io/ipld-eth-server/v5/pkg/prom"
)
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules.
func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (*rpc.Server, error) {
func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts, voucherValidator paymentsmanager.VoucherValidator, queryRates map[string]*big.Int) (*rpc.Server, error) {
srv := rpc.NewServer()
err := node.RegisterApis(apis, modules, srv)
if err != nil {
utils.Fatalf("Could not register HTTP API: %w", err)
}
handler := prom.HTTPMiddleware(node.NewHTTPHandlerStack(srv, cors, vhosts, nil))
promHandler := prom.HTTPMiddleware(node.NewHTTPHandlerStack(srv, cors, vhosts, nil))
paymentHandler := paymentsmanager.HTTPMiddleware(promHandler, voucherValidator, queryRates)
// start http server
_, addr, err := node.StartHTTPEndpoint(endpoint, rpc.DefaultHTTPTimeouts, handler)
// request -> payments -> metrics -> server
_, addr, err := node.StartHTTPEndpoint(endpoint, rpc.DefaultHTTPTimeouts, paymentHandler)
if err != nil {
utils.Fatalf("Could not start RPC api: %v", err)
}