315 lines
8.7 KiB
Go
315 lines
8.7 KiB
Go
// VulcanizeDB
|
|
// Copyright © 2023 Vulcanize
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package payments
|
|
|
|
import (
|
|
"errors"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
|
"github.com/statechannels/go-nitro/node/engine"
|
|
"github.com/statechannels/go-nitro/node/engine/chainservice"
|
|
"github.com/statechannels/go-nitro/node/engine/store"
|
|
"github.com/statechannels/go-nitro/payments"
|
|
|
|
nitroNode "github.com/statechannels/go-nitro/node"
|
|
p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
|
|
nitroTypes "github.com/statechannels/go-nitro/types"
|
|
)
|
|
|
|
const (
|
|
DEFAULT_LRU_CACHE_MAX_ACCOUNTS = 1000
|
|
DEFAULT_LRU_CACHE_ACCOUNT_TTL = 30 * 60 // 30mins
|
|
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT = 1000
|
|
DEFAULT_LRU_CACHE_VOUCHER_TTL = 5 * 60 // 5mins
|
|
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS = 10000
|
|
DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL = DEFAULT_LRU_CACHE_ACCOUNT_TTL
|
|
|
|
DEFAULT_VOUCHER_CHECK_INTERVAL = 2
|
|
DEFAULT_VOUCHER_CHECK_ATTEMPTS = 5
|
|
)
|
|
|
|
var (
|
|
ERR_PAYMENT_NOT_RECEIVED = errors.New("Payment not received")
|
|
ERR_AMOUNT_INSUFFICIENT = errors.New("Payment amount insufficient")
|
|
)
|
|
|
|
type Payment struct {
|
|
voucher payments.Voucher
|
|
amount *big.Int
|
|
}
|
|
|
|
// Struct representing payments manager
|
|
// Maintains either an in-process Nitro node or communication with an out-of-process Nitro node
|
|
type PaymentsManager struct {
|
|
// Whether to run an in-process Nitro node
|
|
runInProcessNitroNode bool
|
|
|
|
// In-process Nitro node; nil when runInProcessNitroNode is false
|
|
nitroNode *nitroNode.Node
|
|
|
|
receivedPayments *expirable.LRU[common.Address, *expirable.LRU[common.Hash, Payment]]
|
|
|
|
paidSoFarOnChannel *expirable.LRU[nitroTypes.Destination, *big.Int]
|
|
|
|
// Used to signal shutdown of the service
|
|
quitChan chan bool
|
|
}
|
|
|
|
func NewPaymentsManager(runInProcessNitroNode bool) (PaymentsManager, error) {
|
|
var err error
|
|
|
|
pm := PaymentsManager{runInProcessNitroNode: runInProcessNitroNode}
|
|
|
|
if runInProcessNitroNode {
|
|
pm.nitroNode, err = initializeNitroNode()
|
|
if err != nil {
|
|
return PaymentsManager{}, err
|
|
}
|
|
}
|
|
|
|
pm.receivedPayments = expirable.NewLRU[common.Address, *expirable.LRU[common.Hash, Payment]](
|
|
DEFAULT_LRU_CACHE_MAX_ACCOUNTS,
|
|
nil,
|
|
time.Second*DEFAULT_LRU_CACHE_ACCOUNT_TTL,
|
|
)
|
|
|
|
pm.paidSoFarOnChannel = expirable.NewLRU[nitroTypes.Destination, *big.Int](
|
|
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS,
|
|
nil,
|
|
time.Second*DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL,
|
|
)
|
|
|
|
// Load existing open payment channels with amount paid so far from the stored state
|
|
err = pm.loadPaymentChannels()
|
|
if err != nil {
|
|
return PaymentsManager{}, err
|
|
}
|
|
|
|
return pm, nil
|
|
}
|
|
|
|
func (pm *PaymentsManager) Start(wg *sync.WaitGroup) {
|
|
log.Info("starting payments manager")
|
|
|
|
// Channel for received vouchers
|
|
var receivedVouchers <-chan payments.Voucher
|
|
if pm.runInProcessNitroNode {
|
|
receivedVouchers = pm.nitroNode.ReceivedVouchers()
|
|
} else {
|
|
// TODO: Get vouchers from an out-of-process go-nitro node
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
pm.run(receivedVouchers)
|
|
}()
|
|
}
|
|
|
|
func (pm *PaymentsManager) Stop() error {
|
|
log.Info("stopping payments manager")
|
|
close(pm.quitChan)
|
|
return nil
|
|
}
|
|
|
|
func (pm *PaymentsManager) AuthenticatePayment(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error {
|
|
// Check the payments map for required voucher
|
|
var isPaymentReceived, isOfSufficientValue bool
|
|
for i := 0; i < DEFAULT_VOUCHER_CHECK_ATTEMPTS; i++ {
|
|
isPaymentReceived, isOfSufficientValue = pm.acceptReceivedPayment(voucherHash, signerAddress, value)
|
|
|
|
if isPaymentReceived {
|
|
if !isOfSufficientValue {
|
|
return ERR_AMOUNT_INSUFFICIENT
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Retry after an interval if voucher not found
|
|
log.Debugf("Payment from %s not found, retrying after %d sec...", signerAddress, DEFAULT_VOUCHER_CHECK_INTERVAL)
|
|
time.Sleep(DEFAULT_VOUCHER_CHECK_INTERVAL * time.Second)
|
|
}
|
|
|
|
return ERR_PAYMENT_NOT_RECEIVED
|
|
}
|
|
|
|
// Check for a given payment voucher in LRU cache map
|
|
// Returns whether the voucher was found, whether it was of sufficient value
|
|
func (pm *PaymentsManager) acceptReceivedPayment(voucherHash common.Hash, signerAddress common.Address, minRequiredValue *big.Int) (bool, bool) {
|
|
paymentsMap, ok := pm.receivedPayments.Get(signerAddress)
|
|
if !ok {
|
|
return false, false
|
|
}
|
|
|
|
receivedPayment, ok := paymentsMap.Get(voucherHash)
|
|
if !ok {
|
|
return false, false
|
|
}
|
|
|
|
if receivedPayment.amount.Cmp(minRequiredValue) < 0 {
|
|
return true, false
|
|
}
|
|
|
|
// Delete the voucher from map after consuming it
|
|
paymentsMap.Remove(voucherHash)
|
|
return true, true
|
|
}
|
|
|
|
func (pm *PaymentsManager) run(receivedVouchers <-chan payments.Voucher) {
|
|
log.Info("starting voucher subscription...")
|
|
for {
|
|
select {
|
|
case voucher := <-receivedVouchers:
|
|
payer, err := pm.getChannelCounterparty(voucher.ChannelId)
|
|
if err != nil {
|
|
// TODO: Handle
|
|
panic(err)
|
|
}
|
|
|
|
paidSoFar, ok := pm.paidSoFarOnChannel.Get(voucher.ChannelId)
|
|
if !ok {
|
|
paidSoFar = big.NewInt(0)
|
|
}
|
|
|
|
paymentAmount := big.NewInt(0).Sub(voucher.Amount, paidSoFar)
|
|
pm.paidSoFarOnChannel.Add(voucher.ChannelId, voucher.Amount)
|
|
log.Infof("Received a payment voucher of %s from %s", paymentAmount.String(), payer.String())
|
|
|
|
paymentsMap, ok := pm.receivedPayments.Get(payer)
|
|
if !ok {
|
|
paymentsMap = expirable.NewLRU[common.Hash, Payment](
|
|
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT,
|
|
nil,
|
|
DEFAULT_LRU_CACHE_VOUCHER_TTL,
|
|
)
|
|
|
|
pm.receivedPayments.Add(payer, paymentsMap)
|
|
}
|
|
|
|
voucherHash, err := voucher.Hash()
|
|
if err != nil {
|
|
// TODO: Handle
|
|
panic(err)
|
|
}
|
|
|
|
paymentsMap.Add(voucherHash, Payment{voucher: voucher, amount: paymentAmount})
|
|
case <-pm.quitChan:
|
|
log.Info("stopping voucher subscription loop")
|
|
// TODO: stop the nitro node if that means anything
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (pm *PaymentsManager) getChannelCounterparty(channelId nitroTypes.Destination) (common.Address, error) {
|
|
associatedPaymentChannel, err := pm.nitroNode.GetPaymentChannel(channelId)
|
|
if err != nil {
|
|
return common.Address{}, err
|
|
}
|
|
|
|
return associatedPaymentChannel.Balance.Payer, nil
|
|
}
|
|
|
|
func (pm *PaymentsManager) loadPaymentChannels() error {
|
|
// TODO: Implement
|
|
return nil
|
|
}
|
|
|
|
func initializeNitroNode() (*nitroNode.Node, error) {
|
|
// TODO: Configure
|
|
pkString := ""
|
|
useDurableStore := true
|
|
durableStoreFolder := "./data/nitro-store"
|
|
msgPort := 3005
|
|
wsMsgPort := 5005
|
|
chainUrl := "ws://127.0.0.1:8546"
|
|
chainStartBlock := uint64(0)
|
|
chainPk := ""
|
|
naAddress := ""
|
|
vpaAddress := ""
|
|
caAddress := ""
|
|
|
|
chainAuthToken := ""
|
|
publicIp := "0.0.0.0"
|
|
|
|
chainOpts := chainservice.ChainOpts{
|
|
ChainUrl: chainUrl,
|
|
ChainStartBlock: chainStartBlock,
|
|
ChainAuthToken: chainAuthToken,
|
|
ChainPk: chainPk,
|
|
NaAddress: common.HexToAddress(naAddress),
|
|
VpaAddress: common.HexToAddress(vpaAddress),
|
|
CaAddress: common.HexToAddress(caAddress),
|
|
}
|
|
|
|
storeOpts := store.StoreOpts{
|
|
PkBytes: common.Hex2Bytes(pkString),
|
|
UseDurableStore: useDurableStore,
|
|
DurableStoreFolder: durableStoreFolder,
|
|
}
|
|
|
|
peerSlice := []string{}
|
|
|
|
messageOpts := p2pms.MessageOpts{
|
|
PkBytes: common.Hex2Bytes(pkString),
|
|
TcpPort: msgPort,
|
|
WsMsgPort: wsMsgPort,
|
|
BootPeers: peerSlice,
|
|
PublicIp: publicIp,
|
|
}
|
|
|
|
ourStore, err := store.NewStore(storeOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort)
|
|
messageOpts.SCAddr = *ourStore.GetAddress()
|
|
messageService := p2pms.NewMessageService(messageOpts)
|
|
|
|
// Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two
|
|
// gets passed as an argument when creating NewEthChainService
|
|
storeBlockNum, err := ourStore.GetLastBlockNumSeen()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if storeBlockNum > chainOpts.ChainStartBlock {
|
|
chainOpts.ChainStartBlock = storeBlockNum
|
|
}
|
|
|
|
log.Info("Initializing chain service...")
|
|
ourChain, err := chainservice.NewEthChainService(chainOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
node := nitroNode.New(
|
|
messageService,
|
|
ourChain,
|
|
ourStore,
|
|
&engine.PermissivePolicy{},
|
|
)
|
|
|
|
return &node, nil
|
|
}
|