diff --git a/go.mod b/go.mod index eac05ac2..a700daf6 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/ethereum/go-ethereum v1.12.0 // TODO: Investigate github.com/google/uuid v1.3.0 github.com/graph-gophers/graphql-go v1.3.0 + github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/ipfs/go-cid v0.4.1 github.com/jmoiron/sqlx v1.3.5 github.com/joho/godotenv v1.4.0 @@ -90,7 +91,6 @@ require ( github.com/hashicorp/go-bexpr v0.1.12 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect - github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.3 // indirect diff --git a/pkg/payments/payments_manager.go b/pkg/payments/payments_manager.go index 1b6f3697..1547846e 100644 --- a/pkg/payments/payments_manager.go +++ b/pkg/payments/payments_manager.go @@ -17,21 +17,44 @@ package payments import ( + "math/big" "sync" + "time" "github.com/cerc-io/ipld-eth-server/v5/pkg/log" "github.com/ethereum/go-ethereum/common" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/statechannels/go-nitro/node/engine" "github.com/statechannels/go-nitro/node/engine/chainservice" "github.com/statechannels/go-nitro/node/engine/store" + "github.com/statechannels/go-nitro/payments" nitroNode "github.com/statechannels/go-nitro/node" p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service" + nitroTypes "github.com/statechannels/go-nitro/types" ) -// TODO: Implement +const ( + DEFAULT_LRU_CACHE_MAX_ACCOUNTS = 1000 + DEFAULT_LRU_CACHE_ACCOUNT_TTL = 30 * 60 // 30mins + DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT = 1000 + DEFAULT_LRU_CACHE_VOUCHER_TTL = 5 * 60 // 5mins + DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS = 10000 + DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL = DEFAULT_LRU_CACHE_ACCOUNT_TTL +) + +type Payment struct { + voucher payments.Voucher + amount *big.Int +} + +type paymentVoucher struct { + payer common.Address + voucherHash common.Hash +} + // Struct representing payments manager -// Maintains either an in-process or communication with an out-of-process Nitro node +// Maintains either an in-process Nitro node or communication with an out-of-process Nitro node type PaymentsManager struct { // Whether to run an in-process Nitro node runInProcessNitroNode bool @@ -39,6 +62,12 @@ type PaymentsManager struct { // In-process Nitro node; nil when runInProcessNitroNode is false nitroNode *nitroNode.Node + receivedPayments *expirable.LRU[common.Address, *expirable.LRU[common.Hash, Payment]] + + paidSoFarOnChannel *expirable.LRU[nitroTypes.Destination, *big.Int] + + paymentListeners []chan paymentVoucher + // Used to signal shutdown of the service quitChan chan bool } @@ -56,24 +85,115 @@ func NewPaymentsManager(runInProcessNitroNode bool) (PaymentsManager, error) { } } + pm.receivedPayments = expirable.NewLRU[common.Address, *expirable.LRU[common.Hash, Payment]]( + DEFAULT_LRU_CACHE_MAX_ACCOUNTS, + nil, + time.Second*DEFAULT_LRU_CACHE_ACCOUNT_TTL, + ) + + pm.paidSoFarOnChannel = expirable.NewLRU[nitroTypes.Destination, *big.Int]( + DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS, + nil, + time.Second*DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL, + ) + + // Load existing open payment channels with amount paid so far from the stored state + err = pm.loadPaymentChannels() + if err != nil { + return PaymentsManager{}, err + } + return pm, nil } func (pm *PaymentsManager) Start(wg *sync.WaitGroup) { - // TODO: Implement + log.Info("starting payments manager") + + // Channel for received vouchers + var receivedVouchers <-chan payments.Voucher + if pm.runInProcessNitroNode { + receivedVouchers = pm.nitroNode.ReceivedVouchers() + } else { + // TODO: Get vouchers from an out-of-process go-nitro node + } + + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() - <-pm.quitChan + pm.run(receivedVouchers) }() } func (pm *PaymentsManager) Stop() error { - // TODO: Implement + log.Info("stopping payments manager") close(pm.quitChan) return nil } +func (pm *PaymentsManager) run(receivedVouchers <-chan payments.Voucher) { + log.Info("starting voucher subscription...") + for { + select { + case voucher := <-receivedVouchers: + payer, err := pm.getChannelCounterparty(voucher.ChannelId) + if err != nil { + // TODO: Handle + panic(err) + } + + paidSoFar, ok := pm.paidSoFarOnChannel.Get(voucher.ChannelId) + if !ok { + paidSoFar = big.NewInt(0) + } + + paymentAmount := big.NewInt(0).Sub(voucher.Amount, paidSoFar) + pm.paidSoFarOnChannel.Add(voucher.ChannelId, voucher.Amount) + log.Infof("Received a payment voucher of %s from %s", paymentAmount.String(), payer.String()) + + paymentsMap, ok := pm.receivedPayments.Get(payer) + if !ok { + paymentsMap = expirable.NewLRU[common.Hash, Payment]( + DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT, + nil, + DEFAULT_LRU_CACHE_VOUCHER_TTL, + ) + + pm.receivedPayments.Add(payer, paymentsMap) + } + + voucherHash, err := voucher.Hash() + if err != nil { + // TODO: Handle + panic(err) + } + + paymentsMap.Add(voucherHash, Payment{voucher: voucher, amount: paymentAmount}) + + for _, listener := range pm.paymentListeners { + listener <- paymentVoucher{payer: payer, voucherHash: voucherHash} + } + case <-pm.quitChan: + log.Info("stopping voucher subscription loop") + // TODO: stop the nitro node if that means anything + return + } + } +} + +func (pm *PaymentsManager) getChannelCounterparty(channelId nitroTypes.Destination) (common.Address, error) { + associatedPaymentChannel, err := pm.nitroNode.GetPaymentChannel(channelId) + if err != nil { + return common.Address{}, err + } + + return associatedPaymentChannel.Balance.Payer, nil +} + +func (pm *PaymentsManager) loadPaymentChannels() error { + // TODO: Implement + return nil +} + func initializeNitroNode() (*nitroNode.Node, error) { // TODO: Configure pkString := "" @@ -122,7 +242,7 @@ func initializeNitroNode() (*nitroNode.Node, error) { return nil, err } - log.Info("Initializing message service", "tcp port", messageOpts.TcpPort, "web socket port", messageOpts.WsMsgPort) + log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort) messageOpts.SCAddr = *ourStore.GetAddress() messageService := p2pms.NewMessageService(messageOpts)