diff --git a/cmd/serve.go b/cmd/serve.go index a1fb8621..c529d6c0 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -26,6 +26,7 @@ import ( "time" "github.com/cerc-io/ipld-eth-server/v5/pkg/log" + "github.com/cerc-io/ipld-eth-server/v5/pkg/payments" "github.com/ethereum/go-ethereum/rpc" "github.com/mailgun/groupcache/v2" "github.com/spf13/cobra" @@ -95,6 +96,11 @@ func serve() { logWithCommand.Debug("state validator disabled") } + // TODO: Create required config for Nitro node + + paymentsManager, _ := payments.NewPaymentsManager(true) + paymentsManager.Start(wg) + shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt) <-shutdown @@ -102,6 +108,8 @@ func serve() { graphQL.Stop() } server.Stop() + paymentsManager.Stop() + wg.Wait() } diff --git a/go.mod b/go.mod index ed291fcd..4ba85589 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,10 @@ require ( github.com/cerc-io/ipfs-ethdb/v5 v5.0.0-alpha github.com/cerc-io/ipld-eth-statedb v0.0.5-alpha github.com/cerc-io/plugeth-statediff v0.1.1 - github.com/ethereum/go-ethereum v1.11.6 + github.com/ethereum/go-ethereum v1.12.0 // TODO: Investigate github.com/google/uuid v1.3.0 github.com/graph-gophers/graphql-go v1.3.0 + github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/ipfs/go-cid v0.4.1 github.com/jmoiron/sqlx v1.3.5 github.com/joho/godotenv v1.4.0 @@ -23,6 +24,7 @@ require ( github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.4.0 github.com/spf13/viper v1.11.0 + github.com/statechannels/go-nitro v0.1.1 gorm.io/driver/postgres v1.3.7 gorm.io/gorm v1.23.5 ) @@ -89,7 +91,6 @@ require ( github.com/hashicorp/go-bexpr v0.1.12 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect - github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.3 // indirect @@ -230,6 +231,14 @@ require ( github.com/subosito/gotenv v1.2.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect github.com/thoas/go-funk v0.9.3 // indirect + github.com/tidwall/btree v1.6.0 // indirect + github.com/tidwall/buntdb v1.2.10 // indirect + github.com/tidwall/gjson v1.14.4 // indirect + github.com/tidwall/grect v0.1.4 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/rtred v0.1.2 // indirect + github.com/tidwall/tinyqueue v0.1.1 // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/tyler-smith/go-bip39 v1.1.0 // indirect @@ -286,6 +295,8 @@ replace ( // TODO: Use releases replace ( - github.com/cerc-io/ipfs-ethdb/v5 => /home/prathamesh/deepstack/ipfs-ethdb github.com/cerc-io/eth-ipfs-state-validator/v5 => /home/prathamesh/deepstack/eth-ipfs-state-validator + github.com/cerc-io/ipfs-ethdb/v5 => /home/prathamesh/deepstack/ipfs-ethdb ) + +replace github.com/statechannels/go-nitro v0.1.1 => github.com/cerc-io/go-nitro v0.1.1-ts-port-0.1.6 diff --git a/go.sum b/go.sum index 08cf696a..cef443e7 100644 --- a/go.sum +++ b/go.sum @@ -91,12 +91,16 @@ github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2y github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= +github.com/btcsuite/btcd v0.22.0-beta h1:LTDpDKUM5EeOFBPM8IXpinEcmZ6FWfNZbE3lfrfdnWo= +github.com/btcsuite/btcd v0.22.0-beta/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA= github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 h1:KdUfX2zKommPRa+PD0sWZUyXe9w277ABlgELO7H04IM= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= +github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ= +github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce/go.mod h1:0DVlHczLPewLcPGEIeUEzfOJhqGPQ0mJJRDBtD307+o= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= @@ -108,6 +112,8 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/ceramicnetwork/go-dag-jose v0.1.0 h1:yJ/HVlfKpnD3LdYP03AHyTvbm3BpPiz2oZiOeReJRdU= github.com/ceramicnetwork/go-dag-jose v0.1.0/go.mod h1:qYA1nYt0X8u4XoMAVoOV3upUVKtrxy/I670Dg5F0wjI= +github.com/cerc-io/go-nitro v0.1.1-ts-port-0.1.6 h1:rtOsOFPnz1bX6Z/Ejxv19PuDEMFxXElvGpTx5pbBkXU= +github.com/cerc-io/go-nitro v0.1.1-ts-port-0.1.6/go.mod h1:Cc6AgGm/Ou9P6vdssCPRDfrpb9iKIhY2hiPcWX4aOrw= github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk= github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= @@ -785,6 +791,8 @@ github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJys github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4= github.com/miekg/dns v1.1.55 h1:GoQ4hpsj0nFLYe+bWiCToyrBEJXkQfOOIvFGFy0lEgo= github.com/miekg/dns v1.1.55/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= +github.com/miguelmota/go-ethereum-hdwallet v0.1.1 h1:zdXGlHao7idpCBjEGTXThVAtMKs+IxAgivZ75xqkWK0= +github.com/miguelmota/go-ethereum-hdwallet v0.1.1/go.mod h1:f9m9uXokAHA6WNoYOPjj4AqjJS5pquQRiYYj/XSyPYc= github.com/mikioh/tcp v0.0.0-20190314235350-803a9b46060c h1:bzE/A84HN25pxAuk9Eej1Kz9OUelF97nAc82bDquQI8= github.com/mikioh/tcp v0.0.0-20190314235350-803a9b46060c/go.mod h1:0SQS9kMwD2VsyFEB++InYyBJroV/FRmBgcydeSUcJms= github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b h1:z78hV3sbSMAUoyUMM0I83AUIT6Hu17AWfgjzIbtrYFc= @@ -1048,6 +1056,28 @@ github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a/go.mod h1:RRCYJ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= +github.com/tidwall/assert v0.1.0 h1:aWcKyRBUAdLoVebxo95N7+YZVTFF/ASTr7BN4sLP6XI= +github.com/tidwall/assert v0.1.0/go.mod h1:QLYtGyeqse53vuELQheYl9dngGCJQ+mTtlxcktb+Kj8= +github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg= +github.com/tidwall/btree v1.6.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= +github.com/tidwall/buntdb v1.2.10 h1:U/ebfkmYPBnyiNZIirUiWFcxA/mgzjbKlyPynFsPtyM= +github.com/tidwall/buntdb v1.2.10/go.mod h1:lZZrZUWzlyDJKlLQ6DKAy53LnG7m5kHyrEHvvcDmBpU= +github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM= +github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/grect v0.1.4 h1:dA3oIgNgWdSspFzn1kS4S/RDpZFLrIxAZOdJKjYapOg= +github.com/tidwall/grect v0.1.4/go.mod h1:9FBsaYRaR0Tcy4UwefBX/UDcDcDy9V5jUcxHzv2jd5Q= +github.com/tidwall/lotsa v1.0.2 h1:dNVBH5MErdaQ/xd9s769R31/n2dXavsQ0Yf4TMEHHw8= +github.com/tidwall/lotsa v1.0.2/go.mod h1:X6NiU+4yHA3fE3Puvpnn1XMDrFZrE9JO2/w+UMuqgR8= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/rtred v0.1.2 h1:exmoQtOLvDoO8ud++6LwVsAMTu0KPzLTUrMln8u1yu8= +github.com/tidwall/rtred v0.1.2/go.mod h1:hd69WNXQ5RP9vHd7dqekAz+RIdtfBogmglkZSRxCHFQ= +github.com/tidwall/tinyqueue v0.1.1 h1:SpNEvEggbpyN5DIReaJ2/1ndroY8iyEGxPYxoSaymYE= +github.com/tidwall/tinyqueue v0.1.1/go.mod h1:O/QNHwrnjqr6IHItYrzoHAKYhBkLI67Q096fQP5zMYw= github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= diff --git a/pkg/payments/payments_manager.go b/pkg/payments/payments_manager.go new file mode 100644 index 00000000..1547846e --- /dev/null +++ b/pkg/payments/payments_manager.go @@ -0,0 +1,273 @@ +// VulcanizeDB +// Copyright © 2023 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package payments + +import ( + "math/big" + "sync" + "time" + + "github.com/cerc-io/ipld-eth-server/v5/pkg/log" + "github.com/ethereum/go-ethereum/common" + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/statechannels/go-nitro/node/engine" + "github.com/statechannels/go-nitro/node/engine/chainservice" + "github.com/statechannels/go-nitro/node/engine/store" + "github.com/statechannels/go-nitro/payments" + + nitroNode "github.com/statechannels/go-nitro/node" + p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" + nitroTypes "github.com/statechannels/go-nitro/types" +) + +const ( + DEFAULT_LRU_CACHE_MAX_ACCOUNTS = 1000 + DEFAULT_LRU_CACHE_ACCOUNT_TTL = 30 * 60 // 30mins + DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT = 1000 + DEFAULT_LRU_CACHE_VOUCHER_TTL = 5 * 60 // 5mins + DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS = 10000 + DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL = DEFAULT_LRU_CACHE_ACCOUNT_TTL +) + +type Payment struct { + voucher payments.Voucher + amount *big.Int +} + +type paymentVoucher struct { + payer common.Address + voucherHash common.Hash +} + +// Struct representing payments manager +// Maintains either an in-process Nitro node or communication with an out-of-process Nitro node +type PaymentsManager struct { + // Whether to run an in-process Nitro node + runInProcessNitroNode bool + + // In-process Nitro node; nil when runInProcessNitroNode is false + nitroNode *nitroNode.Node + + receivedPayments *expirable.LRU[common.Address, *expirable.LRU[common.Hash, Payment]] + + paidSoFarOnChannel *expirable.LRU[nitroTypes.Destination, *big.Int] + + paymentListeners []chan paymentVoucher + + // Used to signal shutdown of the service + quitChan chan bool +} + +func NewPaymentsManager(runInProcessNitroNode bool) (PaymentsManager, error) { + // TODO: Implement + var err error + + pm := PaymentsManager{runInProcessNitroNode: runInProcessNitroNode} + + if runInProcessNitroNode { + pm.nitroNode, err = initializeNitroNode() + if err != nil { + return PaymentsManager{}, err + } + } + + pm.receivedPayments = expirable.NewLRU[common.Address, *expirable.LRU[common.Hash, Payment]]( + DEFAULT_LRU_CACHE_MAX_ACCOUNTS, + nil, + time.Second*DEFAULT_LRU_CACHE_ACCOUNT_TTL, + ) + + pm.paidSoFarOnChannel = expirable.NewLRU[nitroTypes.Destination, *big.Int]( + DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS, + nil, + time.Second*DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL, + ) + + // Load existing open payment channels with amount paid so far from the stored state + err = pm.loadPaymentChannels() + if err != nil { + return PaymentsManager{}, err + } + + return pm, nil +} + +func (pm *PaymentsManager) Start(wg *sync.WaitGroup) { + log.Info("starting payments manager") + + // Channel for received vouchers + var receivedVouchers <-chan payments.Voucher + if pm.runInProcessNitroNode { + receivedVouchers = pm.nitroNode.ReceivedVouchers() + } else { + // TODO: Get vouchers from an out-of-process go-nitro node + } + + wg.Add(1) + go func() { + defer wg.Done() + pm.run(receivedVouchers) + }() +} + +func (pm *PaymentsManager) Stop() error { + log.Info("stopping payments manager") + close(pm.quitChan) + return nil +} + +func (pm *PaymentsManager) run(receivedVouchers <-chan payments.Voucher) { + log.Info("starting voucher subscription...") + for { + select { + case voucher := <-receivedVouchers: + payer, err := pm.getChannelCounterparty(voucher.ChannelId) + if err != nil { + // TODO: Handle + panic(err) + } + + paidSoFar, ok := pm.paidSoFarOnChannel.Get(voucher.ChannelId) + if !ok { + paidSoFar = big.NewInt(0) + } + + paymentAmount := big.NewInt(0).Sub(voucher.Amount, paidSoFar) + pm.paidSoFarOnChannel.Add(voucher.ChannelId, voucher.Amount) + log.Infof("Received a payment voucher of %s from %s", paymentAmount.String(), payer.String()) + + paymentsMap, ok := pm.receivedPayments.Get(payer) + if !ok { + paymentsMap = expirable.NewLRU[common.Hash, Payment]( + DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT, + nil, + DEFAULT_LRU_CACHE_VOUCHER_TTL, + ) + + pm.receivedPayments.Add(payer, paymentsMap) + } + + voucherHash, err := voucher.Hash() + if err != nil { + // TODO: Handle + panic(err) + } + + paymentsMap.Add(voucherHash, Payment{voucher: voucher, amount: paymentAmount}) + + for _, listener := range pm.paymentListeners { + listener <- paymentVoucher{payer: payer, voucherHash: voucherHash} + } + case <-pm.quitChan: + log.Info("stopping voucher subscription loop") + // TODO: stop the nitro node if that means anything + return + } + } +} + +func (pm *PaymentsManager) getChannelCounterparty(channelId nitroTypes.Destination) (common.Address, error) { + associatedPaymentChannel, err := pm.nitroNode.GetPaymentChannel(channelId) + if err != nil { + return common.Address{}, err + } + + return associatedPaymentChannel.Balance.Payer, nil +} + +func (pm *PaymentsManager) loadPaymentChannels() error { + // TODO: Implement + return nil +} + +func initializeNitroNode() (*nitroNode.Node, error) { + // TODO: Configure + pkString := "" + useDurableStore := true + durableStoreFolder := "./data/nitro-store" + msgPort := 3005 + wsMsgPort := 5005 + chainUrl := "ws://127.0.0.1:8546" + chainStartBlock := uint64(0) + chainPk := "" + naAddress := "" + vpaAddress := "" + caAddress := "" + + chainAuthToken := "" + publicIp := "0.0.0.0" + + chainOpts := chainservice.ChainOpts{ + ChainUrl: chainUrl, + ChainStartBlock: chainStartBlock, + ChainAuthToken: chainAuthToken, + ChainPk: chainPk, + NaAddress: common.HexToAddress(naAddress), + VpaAddress: common.HexToAddress(vpaAddress), + CaAddress: common.HexToAddress(caAddress), + } + + storeOpts := store.StoreOpts{ + PkBytes: common.Hex2Bytes(pkString), + UseDurableStore: useDurableStore, + DurableStoreFolder: durableStoreFolder, + } + + peerSlice := []string{} + + messageOpts := p2pms.MessageOpts{ + PkBytes: common.Hex2Bytes(pkString), + TcpPort: msgPort, + WsMsgPort: wsMsgPort, + BootPeers: peerSlice, + PublicIp: publicIp, + } + + ourStore, err := store.NewStore(storeOpts) + if err != nil { + return nil, err + } + + log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort) + messageOpts.SCAddr = *ourStore.GetAddress() + messageService := p2pms.NewMessageService(messageOpts) + + // Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two + // gets passed as an argument when creating NewEthChainService + storeBlockNum, err := ourStore.GetLastBlockNumSeen() + if err != nil { + return nil, err + } + if storeBlockNum > chainOpts.ChainStartBlock { + chainOpts.ChainStartBlock = storeBlockNum + } + + log.Info("Initializing chain service...") + ourChain, err := chainservice.NewEthChainService(chainOpts) + if err != nil { + return nil, err + } + + node := nitroNode.New( + messageService, + ourChain, + ourStore, + &engine.PermissivePolicy{}, + ) + + return &node, nil +}