// VulcanizeDB // Copyright © 2023 Vulcanize // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package payments import ( "math/big" "sync" "time" "github.com/cerc-io/ipld-eth-server/v5/pkg/log" "github.com/ethereum/go-ethereum/common" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/statechannels/go-nitro/node/engine" "github.com/statechannels/go-nitro/node/engine/chainservice" "github.com/statechannels/go-nitro/node/engine/store" "github.com/statechannels/go-nitro/payments" nitroNode "github.com/statechannels/go-nitro/node" p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" nitroTypes "github.com/statechannels/go-nitro/types" ) const ( DEFAULT_LRU_CACHE_MAX_ACCOUNTS = 1000 DEFAULT_LRU_CACHE_ACCOUNT_TTL = 30 * 60 // 30mins DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT = 1000 DEFAULT_LRU_CACHE_VOUCHER_TTL = 5 * 60 // 5mins DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS = 10000 DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL = DEFAULT_LRU_CACHE_ACCOUNT_TTL ) type Payment struct { voucher payments.Voucher amount *big.Int } type paymentVoucher struct { payer common.Address voucherHash common.Hash } // Struct representing payments manager // Maintains either an in-process Nitro node or communication with an out-of-process Nitro node type PaymentsManager struct { // Whether to run an in-process Nitro node runInProcessNitroNode bool // In-process Nitro node; nil when runInProcessNitroNode is false nitroNode *nitroNode.Node receivedPayments *expirable.LRU[common.Address, *expirable.LRU[common.Hash, Payment]] paidSoFarOnChannel *expirable.LRU[nitroTypes.Destination, *big.Int] paymentListeners []chan paymentVoucher // Used to signal shutdown of the service quitChan chan bool } func NewPaymentsManager(runInProcessNitroNode bool) (PaymentsManager, error) { // TODO: Implement var err error pm := PaymentsManager{runInProcessNitroNode: runInProcessNitroNode} if runInProcessNitroNode { pm.nitroNode, err = initializeNitroNode() if err != nil { return PaymentsManager{}, err } } pm.receivedPayments = expirable.NewLRU[common.Address, *expirable.LRU[common.Hash, Payment]]( DEFAULT_LRU_CACHE_MAX_ACCOUNTS, nil, time.Second*DEFAULT_LRU_CACHE_ACCOUNT_TTL, ) pm.paidSoFarOnChannel = expirable.NewLRU[nitroTypes.Destination, *big.Int]( DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS, nil, time.Second*DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL, ) // Load existing open payment channels with amount paid so far from the stored state err = pm.loadPaymentChannels() if err != nil { return PaymentsManager{}, err } return pm, nil } func (pm *PaymentsManager) Start(wg *sync.WaitGroup) { log.Info("starting payments manager") // Channel for received vouchers var receivedVouchers <-chan payments.Voucher if pm.runInProcessNitroNode { receivedVouchers = pm.nitroNode.ReceivedVouchers() } else { // TODO: Get vouchers from an out-of-process go-nitro node } wg.Add(1) go func() { defer wg.Done() pm.run(receivedVouchers) }() } func (pm *PaymentsManager) Stop() error { log.Info("stopping payments manager") close(pm.quitChan) return nil } func (pm *PaymentsManager) run(receivedVouchers <-chan payments.Voucher) { log.Info("starting voucher subscription...") for { select { case voucher := <-receivedVouchers: payer, err := pm.getChannelCounterparty(voucher.ChannelId) if err != nil { // TODO: Handle panic(err) } paidSoFar, ok := pm.paidSoFarOnChannel.Get(voucher.ChannelId) if !ok { paidSoFar = big.NewInt(0) } paymentAmount := big.NewInt(0).Sub(voucher.Amount, paidSoFar) pm.paidSoFarOnChannel.Add(voucher.ChannelId, voucher.Amount) log.Infof("Received a payment voucher of %s from %s", paymentAmount.String(), payer.String()) paymentsMap, ok := pm.receivedPayments.Get(payer) if !ok { paymentsMap = expirable.NewLRU[common.Hash, Payment]( DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT, nil, DEFAULT_LRU_CACHE_VOUCHER_TTL, ) pm.receivedPayments.Add(payer, paymentsMap) } voucherHash, err := voucher.Hash() if err != nil { // TODO: Handle panic(err) } paymentsMap.Add(voucherHash, Payment{voucher: voucher, amount: paymentAmount}) for _, listener := range pm.paymentListeners { listener <- paymentVoucher{payer: payer, voucherHash: voucherHash} } case <-pm.quitChan: log.Info("stopping voucher subscription loop") // TODO: stop the nitro node if that means anything return } } } func (pm *PaymentsManager) getChannelCounterparty(channelId nitroTypes.Destination) (common.Address, error) { associatedPaymentChannel, err := pm.nitroNode.GetPaymentChannel(channelId) if err != nil { return common.Address{}, err } return associatedPaymentChannel.Balance.Payer, nil } func (pm *PaymentsManager) loadPaymentChannels() error { // TODO: Implement return nil } func initializeNitroNode() (*nitroNode.Node, error) { // TODO: Configure pkString := "" useDurableStore := true durableStoreFolder := "./data/nitro-store" msgPort := 3005 wsMsgPort := 5005 chainUrl := "ws://127.0.0.1:8546" chainStartBlock := uint64(0) chainPk := "" naAddress := "" vpaAddress := "" caAddress := "" chainAuthToken := "" publicIp := "0.0.0.0" chainOpts := chainservice.ChainOpts{ ChainUrl: chainUrl, ChainStartBlock: chainStartBlock, ChainAuthToken: chainAuthToken, ChainPk: chainPk, NaAddress: common.HexToAddress(naAddress), VpaAddress: common.HexToAddress(vpaAddress), CaAddress: common.HexToAddress(caAddress), } storeOpts := store.StoreOpts{ PkBytes: common.Hex2Bytes(pkString), UseDurableStore: useDurableStore, DurableStoreFolder: durableStoreFolder, } peerSlice := []string{} messageOpts := p2pms.MessageOpts{ PkBytes: common.Hex2Bytes(pkString), TcpPort: msgPort, WsMsgPort: wsMsgPort, BootPeers: peerSlice, PublicIp: publicIp, } ourStore, err := store.NewStore(storeOpts) if err != nil { return nil, err } log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort) messageOpts.SCAddr = *ourStore.GetAddress() messageService := p2pms.NewMessageService(messageOpts) // Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two // gets passed as an argument when creating NewEthChainService storeBlockNum, err := ourStore.GetLastBlockNumSeen() if err != nil { return nil, err } if storeBlockNum > chainOpts.ChainStartBlock { chainOpts.ChainStartBlock = storeBlockNum } log.Info("Initializing chain service...") ourChain, err := chainservice.NewEthChainService(chainOpts) if err != nil { return nil, err } node := nitroNode.New( messageService, ourChain, ourStore, &engine.PermissivePolicy{}, ) return &node, nil }