Setup in-process go-nitro node for payments
This commit is contained in:
parent
8c2f71f655
commit
1af95d35ab
118
cmd/serve.go
118
cmd/serve.go
@ -17,6 +17,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"math/big"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
@ -26,16 +27,22 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
|
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
|
||||||
"github.com/cerc-io/ipld-eth-server/v5/pkg/payments"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
"github.com/mailgun/groupcache/v2"
|
"github.com/mailgun/groupcache/v2"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
"github.com/statechannels/go-nitro/node/engine"
|
||||||
|
"github.com/statechannels/go-nitro/node/engine/chainservice"
|
||||||
|
"github.com/statechannels/go-nitro/node/engine/store"
|
||||||
|
"github.com/statechannels/go-nitro/paymentsmanager"
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v5/pkg/graphql"
|
"github.com/cerc-io/ipld-eth-server/v5/pkg/graphql"
|
||||||
srpc "github.com/cerc-io/ipld-eth-server/v5/pkg/rpc"
|
srpc "github.com/cerc-io/ipld-eth-server/v5/pkg/rpc"
|
||||||
s "github.com/cerc-io/ipld-eth-server/v5/pkg/serve"
|
s "github.com/cerc-io/ipld-eth-server/v5/pkg/serve"
|
||||||
v "github.com/cerc-io/ipld-eth-server/v5/version"
|
v "github.com/cerc-io/ipld-eth-server/v5/version"
|
||||||
|
nitroNode "github.com/statechannels/go-nitro/node"
|
||||||
|
nitrop2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrNoRpcEndpoints = errors.New("no rpc endpoints is available")
|
var ErrNoRpcEndpoints = errors.New("no rpc endpoints is available")
|
||||||
@ -76,7 +83,24 @@ func serve() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
server.Serve(wg)
|
server.Serve(wg)
|
||||||
if err := startServers(server, serverConfig); err != nil {
|
|
||||||
|
// TODO: Create required config for Nitro node
|
||||||
|
nitroNode, err := initializeNitroNode()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pm, err := paymentsmanager.NewPaymentsManager(nitroNode)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pm.Start(wg)
|
||||||
|
|
||||||
|
voucherValidator := paymentsmanager.InProcessValidator{PaymentsManager: pm}
|
||||||
|
queryRates := map[string]*big.Int{"eth_getBlockByNumber": big.NewInt(50)}
|
||||||
|
|
||||||
|
if err := startServers(server, serverConfig, voucherValidator, queryRates); err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
graphQL, err := startEthGraphQL(server, serverConfig)
|
graphQL, err := startEthGraphQL(server, serverConfig)
|
||||||
@ -96,10 +120,8 @@ func serve() {
|
|||||||
logWithCommand.Debug("state validator disabled")
|
logWithCommand.Debug("state validator disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Create required config for Nitro node
|
// paymentsManager, _ := payments.NewPaymentsManager(true)
|
||||||
|
// paymentsManager.Start(wg)
|
||||||
paymentsManager, _ := payments.NewPaymentsManager(true)
|
|
||||||
paymentsManager.Start(wg)
|
|
||||||
|
|
||||||
shutdown := make(chan os.Signal, 1)
|
shutdown := make(chan os.Signal, 1)
|
||||||
signal.Notify(shutdown, os.Interrupt)
|
signal.Notify(shutdown, os.Interrupt)
|
||||||
@ -108,12 +130,12 @@ func serve() {
|
|||||||
graphQL.Stop()
|
graphQL.Stop()
|
||||||
}
|
}
|
||||||
server.Stop()
|
server.Stop()
|
||||||
paymentsManager.Stop()
|
pm.Stop()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func startServers(server s.Server, settings *s.Config) error {
|
func startServers(server s.Server, settings *s.Config, voucherValidator paymentsmanager.VoucherValidator, queryRates map[string]*big.Int) error {
|
||||||
if settings.IPCEnabled {
|
if settings.IPCEnabled {
|
||||||
logWithCommand.Debug("starting up IPC server")
|
logWithCommand.Debug("starting up IPC server")
|
||||||
_, _, err := srpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs())
|
_, _, err := srpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs())
|
||||||
@ -136,7 +158,7 @@ func startServers(server s.Server, settings *s.Config) error {
|
|||||||
|
|
||||||
if settings.HTTPEnabled {
|
if settings.HTTPEnabled {
|
||||||
logWithCommand.Debug("starting up HTTP server")
|
logWithCommand.Debug("starting up HTTP server")
|
||||||
_, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"vdb", "eth", "debug", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
|
_, err := srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"vdb", "eth", "debug", "net"}, nil, []string{"*"}, rpc.HTTPTimeouts{}, voucherValidator, queryRates)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -347,3 +369,81 @@ func init() {
|
|||||||
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
|
viper.BindPFlag("validator.enabled", serveCmd.PersistentFlags().Lookup("validator-enabled"))
|
||||||
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
|
viper.BindPFlag("validator.everyNthBlock", serveCmd.PersistentFlags().Lookup("validator-every-nth-block"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func initializeNitroNode() (*nitroNode.Node, error) {
|
||||||
|
// TODO: Configure
|
||||||
|
pkString := ""
|
||||||
|
useDurableStore := true
|
||||||
|
durableStoreFolder := "./data/nitro-store"
|
||||||
|
msgPort := 3005
|
||||||
|
wsMsgPort := 5005
|
||||||
|
chainUrl := "ws://127.0.0.1:8545"
|
||||||
|
chainStartBlock := uint64(0)
|
||||||
|
chainPk := ""
|
||||||
|
naAddress := ""
|
||||||
|
vpaAddress := ""
|
||||||
|
caAddress := ""
|
||||||
|
|
||||||
|
chainAuthToken := ""
|
||||||
|
publicIp := "0.0.0.0"
|
||||||
|
|
||||||
|
chainOpts := chainservice.ChainOpts{
|
||||||
|
ChainUrl: chainUrl,
|
||||||
|
ChainStartBlock: chainStartBlock,
|
||||||
|
ChainAuthToken: chainAuthToken,
|
||||||
|
ChainPk: chainPk,
|
||||||
|
NaAddress: common.HexToAddress(naAddress),
|
||||||
|
VpaAddress: common.HexToAddress(vpaAddress),
|
||||||
|
CaAddress: common.HexToAddress(caAddress),
|
||||||
|
}
|
||||||
|
|
||||||
|
storeOpts := store.StoreOpts{
|
||||||
|
PkBytes: common.Hex2Bytes(pkString),
|
||||||
|
UseDurableStore: useDurableStore,
|
||||||
|
DurableStoreFolder: durableStoreFolder,
|
||||||
|
}
|
||||||
|
|
||||||
|
peerSlice := []string{}
|
||||||
|
|
||||||
|
messageOpts := nitrop2pms.MessageOpts{
|
||||||
|
PkBytes: common.Hex2Bytes(pkString),
|
||||||
|
TcpPort: msgPort,
|
||||||
|
WsMsgPort: wsMsgPort,
|
||||||
|
BootPeers: peerSlice,
|
||||||
|
PublicIp: publicIp,
|
||||||
|
}
|
||||||
|
|
||||||
|
ourStore, err := store.NewStore(storeOpts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort)
|
||||||
|
messageOpts.SCAddr = *ourStore.GetAddress()
|
||||||
|
messageService := nitrop2pms.NewMessageService(messageOpts)
|
||||||
|
|
||||||
|
// Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two
|
||||||
|
// gets passed as an argument when creating NewEthChainService
|
||||||
|
storeBlockNum, err := ourStore.GetLastBlockNumSeen()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if storeBlockNum > chainOpts.ChainStartBlock {
|
||||||
|
chainOpts.ChainStartBlock = storeBlockNum
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Initializing chain service...")
|
||||||
|
ourChain, err := chainservice.NewEthChainService(chainOpts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
node := nitroNode.New(
|
||||||
|
messageService,
|
||||||
|
ourChain,
|
||||||
|
ourStore,
|
||||||
|
&engine.PermissivePolicy{},
|
||||||
|
)
|
||||||
|
|
||||||
|
return &node, nil
|
||||||
|
}
|
||||||
|
3
go.mod
3
go.mod
@ -79,6 +79,7 @@ require (
|
|||||||
github.com/gofrs/flock v0.8.1 // indirect
|
github.com/gofrs/flock v0.8.1 // indirect
|
||||||
github.com/gogo/protobuf v1.3.2 // indirect
|
github.com/gogo/protobuf v1.3.2 // indirect
|
||||||
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
|
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
|
||||||
|
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
|
||||||
github.com/golang/mock v1.6.0 // indirect
|
github.com/golang/mock v1.6.0 // indirect
|
||||||
github.com/golang/protobuf v1.5.3 // indirect
|
github.com/golang/protobuf v1.5.3 // indirect
|
||||||
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
|
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
|
||||||
@ -299,4 +300,4 @@ replace (
|
|||||||
github.com/cerc-io/ipfs-ethdb/v5 => git.vdb.to/deep-stack/ipfs-ethdb/v5 v5.0.1-alpha.0.20231003124335-b6cf70668a07
|
github.com/cerc-io/ipfs-ethdb/v5 => git.vdb.to/deep-stack/ipfs-ethdb/v5 v5.0.1-alpha.0.20231003124335-b6cf70668a07
|
||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/statechannels/go-nitro v0.1.1 => github.com/cerc-io/go-nitro v0.1.1-ts-port-0.1.6
|
replace github.com/statechannels/go-nitro v0.1.1 => github.com/cerc-io/go-nitro v0.1.2-0.20231012043644-1b31b5133214
|
||||||
|
6
go.sum
6
go.sum
@ -116,8 +116,8 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
|
|||||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||||
github.com/ceramicnetwork/go-dag-jose v0.1.0 h1:yJ/HVlfKpnD3LdYP03AHyTvbm3BpPiz2oZiOeReJRdU=
|
github.com/ceramicnetwork/go-dag-jose v0.1.0 h1:yJ/HVlfKpnD3LdYP03AHyTvbm3BpPiz2oZiOeReJRdU=
|
||||||
github.com/ceramicnetwork/go-dag-jose v0.1.0/go.mod h1:qYA1nYt0X8u4XoMAVoOV3upUVKtrxy/I670Dg5F0wjI=
|
github.com/ceramicnetwork/go-dag-jose v0.1.0/go.mod h1:qYA1nYt0X8u4XoMAVoOV3upUVKtrxy/I670Dg5F0wjI=
|
||||||
github.com/cerc-io/go-nitro v0.1.1-ts-port-0.1.6 h1:rtOsOFPnz1bX6Z/Ejxv19PuDEMFxXElvGpTx5pbBkXU=
|
github.com/cerc-io/go-nitro v0.1.2-0.20231012043644-1b31b5133214 h1:nrkG2Rym9jOpIFD9MyVLTkfrxyG3Uf0ibusIHeQDKQk=
|
||||||
github.com/cerc-io/go-nitro v0.1.1-ts-port-0.1.6/go.mod h1:Cc6AgGm/Ou9P6vdssCPRDfrpb9iKIhY2hiPcWX4aOrw=
|
github.com/cerc-io/go-nitro v0.1.2-0.20231012043644-1b31b5133214/go.mod h1:gkKL37JcSo54ybLTI6VJRmP75bWEu9i1kc9RYmQLp+I=
|
||||||
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
|
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
|
||||||
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
|
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
|
||||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||||
@ -292,6 +292,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
|||||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||||
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
|
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
|
||||||
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
||||||
|
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
|
||||||
|
github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
|
||||||
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
|
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
|
||||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||||
|
@ -1,314 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2023 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package payments
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"math/big"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
|
||||||
"github.com/statechannels/go-nitro/node/engine"
|
|
||||||
"github.com/statechannels/go-nitro/node/engine/chainservice"
|
|
||||||
"github.com/statechannels/go-nitro/node/engine/store"
|
|
||||||
"github.com/statechannels/go-nitro/payments"
|
|
||||||
|
|
||||||
nitroNode "github.com/statechannels/go-nitro/node"
|
|
||||||
p2pms "github.com/statechannels/go-nitro/node/engine/messageservice/p2p-message-service"
|
|
||||||
nitroTypes "github.com/statechannels/go-nitro/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
DEFAULT_LRU_CACHE_MAX_ACCOUNTS = 1000
|
|
||||||
DEFAULT_LRU_CACHE_ACCOUNT_TTL = 30 * 60 // 30mins
|
|
||||||
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT = 1000
|
|
||||||
DEFAULT_LRU_CACHE_VOUCHER_TTL = 5 * 60 // 5mins
|
|
||||||
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS = 10000
|
|
||||||
DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL = DEFAULT_LRU_CACHE_ACCOUNT_TTL
|
|
||||||
|
|
||||||
DEFAULT_VOUCHER_CHECK_INTERVAL = 2
|
|
||||||
DEFAULT_VOUCHER_CHECK_ATTEMPTS = 5
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
ERR_PAYMENT_NOT_RECEIVED = errors.New("Payment not received")
|
|
||||||
ERR_AMOUNT_INSUFFICIENT = errors.New("Payment amount insufficient")
|
|
||||||
)
|
|
||||||
|
|
||||||
type Payment struct {
|
|
||||||
voucher payments.Voucher
|
|
||||||
amount *big.Int
|
|
||||||
}
|
|
||||||
|
|
||||||
// Struct representing payments manager
|
|
||||||
// Maintains either an in-process Nitro node or communication with an out-of-process Nitro node
|
|
||||||
type PaymentsManager struct {
|
|
||||||
// Whether to run an in-process Nitro node
|
|
||||||
runInProcessNitroNode bool
|
|
||||||
|
|
||||||
// In-process Nitro node; nil when runInProcessNitroNode is false
|
|
||||||
nitroNode *nitroNode.Node
|
|
||||||
|
|
||||||
receivedPayments *expirable.LRU[common.Address, *expirable.LRU[common.Hash, Payment]]
|
|
||||||
|
|
||||||
paidSoFarOnChannel *expirable.LRU[nitroTypes.Destination, *big.Int]
|
|
||||||
|
|
||||||
// Used to signal shutdown of the service
|
|
||||||
quitChan chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPaymentsManager(runInProcessNitroNode bool) (PaymentsManager, error) {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
pm := PaymentsManager{runInProcessNitroNode: runInProcessNitroNode}
|
|
||||||
|
|
||||||
if runInProcessNitroNode {
|
|
||||||
pm.nitroNode, err = initializeNitroNode()
|
|
||||||
if err != nil {
|
|
||||||
return PaymentsManager{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pm.receivedPayments = expirable.NewLRU[common.Address, *expirable.LRU[common.Hash, Payment]](
|
|
||||||
DEFAULT_LRU_CACHE_MAX_ACCOUNTS,
|
|
||||||
nil,
|
|
||||||
time.Second*DEFAULT_LRU_CACHE_ACCOUNT_TTL,
|
|
||||||
)
|
|
||||||
|
|
||||||
pm.paidSoFarOnChannel = expirable.NewLRU[nitroTypes.Destination, *big.Int](
|
|
||||||
DEFAULT_LRU_CACHE_MAX_PAYMENT_CHANNELS,
|
|
||||||
nil,
|
|
||||||
time.Second*DEFAULT_LRU_CACHE_PAYMENT_CHANNEL_TTL,
|
|
||||||
)
|
|
||||||
|
|
||||||
// Load existing open payment channels with amount paid so far from the stored state
|
|
||||||
err = pm.loadPaymentChannels()
|
|
||||||
if err != nil {
|
|
||||||
return PaymentsManager{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return pm, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *PaymentsManager) Start(wg *sync.WaitGroup) {
|
|
||||||
log.Info("starting payments manager")
|
|
||||||
|
|
||||||
// Channel for received vouchers
|
|
||||||
var receivedVouchers <-chan payments.Voucher
|
|
||||||
if pm.runInProcessNitroNode {
|
|
||||||
receivedVouchers = pm.nitroNode.ReceivedVouchers()
|
|
||||||
} else {
|
|
||||||
// TODO: Get vouchers from an out-of-process go-nitro node
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
pm.run(receivedVouchers)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *PaymentsManager) Stop() error {
|
|
||||||
log.Info("stopping payments manager")
|
|
||||||
close(pm.quitChan)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *PaymentsManager) AuthenticatePayment(voucherHash common.Hash, signerAddress common.Address, value *big.Int) error {
|
|
||||||
// Check the payments map for required voucher
|
|
||||||
var isPaymentReceived, isOfSufficientValue bool
|
|
||||||
for i := 0; i < DEFAULT_VOUCHER_CHECK_ATTEMPTS; i++ {
|
|
||||||
isPaymentReceived, isOfSufficientValue = pm.acceptReceivedPayment(voucherHash, signerAddress, value)
|
|
||||||
|
|
||||||
if isPaymentReceived {
|
|
||||||
if !isOfSufficientValue {
|
|
||||||
return ERR_AMOUNT_INSUFFICIENT
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retry after an interval if voucher not found
|
|
||||||
log.Debugf("Payment from %s not found, retrying after %d sec...", signerAddress, DEFAULT_VOUCHER_CHECK_INTERVAL)
|
|
||||||
time.Sleep(DEFAULT_VOUCHER_CHECK_INTERVAL * time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
return ERR_PAYMENT_NOT_RECEIVED
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for a given payment voucher in LRU cache map
|
|
||||||
// Returns whether the voucher was found, whether it was of sufficient value
|
|
||||||
func (pm *PaymentsManager) acceptReceivedPayment(voucherHash common.Hash, signerAddress common.Address, minRequiredValue *big.Int) (bool, bool) {
|
|
||||||
paymentsMap, ok := pm.receivedPayments.Get(signerAddress)
|
|
||||||
if !ok {
|
|
||||||
return false, false
|
|
||||||
}
|
|
||||||
|
|
||||||
receivedPayment, ok := paymentsMap.Get(voucherHash)
|
|
||||||
if !ok {
|
|
||||||
return false, false
|
|
||||||
}
|
|
||||||
|
|
||||||
if receivedPayment.amount.Cmp(minRequiredValue) < 0 {
|
|
||||||
return true, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete the voucher from map after consuming it
|
|
||||||
paymentsMap.Remove(voucherHash)
|
|
||||||
return true, true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *PaymentsManager) run(receivedVouchers <-chan payments.Voucher) {
|
|
||||||
log.Info("starting voucher subscription...")
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case voucher := <-receivedVouchers:
|
|
||||||
payer, err := pm.getChannelCounterparty(voucher.ChannelId)
|
|
||||||
if err != nil {
|
|
||||||
// TODO: Handle
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
paidSoFar, ok := pm.paidSoFarOnChannel.Get(voucher.ChannelId)
|
|
||||||
if !ok {
|
|
||||||
paidSoFar = big.NewInt(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
paymentAmount := big.NewInt(0).Sub(voucher.Amount, paidSoFar)
|
|
||||||
pm.paidSoFarOnChannel.Add(voucher.ChannelId, voucher.Amount)
|
|
||||||
log.Infof("Received a payment voucher of %s from %s", paymentAmount.String(), payer.String())
|
|
||||||
|
|
||||||
paymentsMap, ok := pm.receivedPayments.Get(payer)
|
|
||||||
if !ok {
|
|
||||||
paymentsMap = expirable.NewLRU[common.Hash, Payment](
|
|
||||||
DEFAULT_LRU_CACHE_MAX_VOUCHERS_PER_ACCOUNT,
|
|
||||||
nil,
|
|
||||||
DEFAULT_LRU_CACHE_VOUCHER_TTL,
|
|
||||||
)
|
|
||||||
|
|
||||||
pm.receivedPayments.Add(payer, paymentsMap)
|
|
||||||
}
|
|
||||||
|
|
||||||
voucherHash, err := voucher.Hash()
|
|
||||||
if err != nil {
|
|
||||||
// TODO: Handle
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
paymentsMap.Add(voucherHash, Payment{voucher: voucher, amount: paymentAmount})
|
|
||||||
case <-pm.quitChan:
|
|
||||||
log.Info("stopping voucher subscription loop")
|
|
||||||
// TODO: stop the nitro node if that means anything
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *PaymentsManager) getChannelCounterparty(channelId nitroTypes.Destination) (common.Address, error) {
|
|
||||||
associatedPaymentChannel, err := pm.nitroNode.GetPaymentChannel(channelId)
|
|
||||||
if err != nil {
|
|
||||||
return common.Address{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return associatedPaymentChannel.Balance.Payer, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm *PaymentsManager) loadPaymentChannels() error {
|
|
||||||
// TODO: Implement
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func initializeNitroNode() (*nitroNode.Node, error) {
|
|
||||||
// TODO: Configure
|
|
||||||
pkString := ""
|
|
||||||
useDurableStore := true
|
|
||||||
durableStoreFolder := "./data/nitro-store"
|
|
||||||
msgPort := 3005
|
|
||||||
wsMsgPort := 5005
|
|
||||||
chainUrl := "ws://127.0.0.1:8546"
|
|
||||||
chainStartBlock := uint64(0)
|
|
||||||
chainPk := ""
|
|
||||||
naAddress := ""
|
|
||||||
vpaAddress := ""
|
|
||||||
caAddress := ""
|
|
||||||
|
|
||||||
chainAuthToken := ""
|
|
||||||
publicIp := "0.0.0.0"
|
|
||||||
|
|
||||||
chainOpts := chainservice.ChainOpts{
|
|
||||||
ChainUrl: chainUrl,
|
|
||||||
ChainStartBlock: chainStartBlock,
|
|
||||||
ChainAuthToken: chainAuthToken,
|
|
||||||
ChainPk: chainPk,
|
|
||||||
NaAddress: common.HexToAddress(naAddress),
|
|
||||||
VpaAddress: common.HexToAddress(vpaAddress),
|
|
||||||
CaAddress: common.HexToAddress(caAddress),
|
|
||||||
}
|
|
||||||
|
|
||||||
storeOpts := store.StoreOpts{
|
|
||||||
PkBytes: common.Hex2Bytes(pkString),
|
|
||||||
UseDurableStore: useDurableStore,
|
|
||||||
DurableStoreFolder: durableStoreFolder,
|
|
||||||
}
|
|
||||||
|
|
||||||
peerSlice := []string{}
|
|
||||||
|
|
||||||
messageOpts := p2pms.MessageOpts{
|
|
||||||
PkBytes: common.Hex2Bytes(pkString),
|
|
||||||
TcpPort: msgPort,
|
|
||||||
WsMsgPort: wsMsgPort,
|
|
||||||
BootPeers: peerSlice,
|
|
||||||
PublicIp: publicIp,
|
|
||||||
}
|
|
||||||
|
|
||||||
ourStore, err := store.NewStore(storeOpts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Initializing message service...", " tcp port=", messageOpts.TcpPort, " web socket port=", messageOpts.WsMsgPort)
|
|
||||||
messageOpts.SCAddr = *ourStore.GetAddress()
|
|
||||||
messageService := p2pms.NewMessageService(messageOpts)
|
|
||||||
|
|
||||||
// Compare chainOpts.ChainStartBlock to lastBlockNum seen in store. The larger of the two
|
|
||||||
// gets passed as an argument when creating NewEthChainService
|
|
||||||
storeBlockNum, err := ourStore.GetLastBlockNumSeen()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if storeBlockNum > chainOpts.ChainStartBlock {
|
|
||||||
chainOpts.ChainStartBlock = storeBlockNum
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Initializing chain service...")
|
|
||||||
ourChain, err := chainservice.NewEthChainService(chainOpts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
node := nitroNode.New(
|
|
||||||
messageService,
|
|
||||||
ourChain,
|
|
||||||
ourStore,
|
|
||||||
&engine.PermissivePolicy{},
|
|
||||||
)
|
|
||||||
|
|
||||||
return &node, nil
|
|
||||||
}
|
|
@ -18,27 +18,31 @@ package rpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/big"
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
|
"github.com/cerc-io/ipld-eth-server/v5/pkg/log"
|
||||||
"github.com/ethereum/go-ethereum/cmd/utils"
|
"github.com/ethereum/go-ethereum/cmd/utils"
|
||||||
"github.com/ethereum/go-ethereum/node"
|
"github.com/ethereum/go-ethereum/node"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
|
"github.com/statechannels/go-nitro/paymentsmanager"
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v5/pkg/prom"
|
"github.com/cerc-io/ipld-eth-server/v5/pkg/prom"
|
||||||
)
|
)
|
||||||
|
|
||||||
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules.
|
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules.
|
||||||
func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (*rpc.Server, error) {
|
func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts, voucherValidator paymentsmanager.VoucherValidator, queryRates map[string]*big.Int) (*rpc.Server, error) {
|
||||||
|
|
||||||
srv := rpc.NewServer()
|
srv := rpc.NewServer()
|
||||||
err := node.RegisterApis(apis, modules, srv)
|
err := node.RegisterApis(apis, modules, srv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utils.Fatalf("Could not register HTTP API: %w", err)
|
utils.Fatalf("Could not register HTTP API: %w", err)
|
||||||
}
|
}
|
||||||
handler := prom.HTTPMiddleware(node.NewHTTPHandlerStack(srv, cors, vhosts, nil))
|
|
||||||
|
promHandler := prom.HTTPMiddleware(node.NewHTTPHandlerStack(srv, cors, vhosts, nil))
|
||||||
|
paymentHandler := paymentsmanager.HTTPMiddleware(promHandler, voucherValidator, queryRates)
|
||||||
|
|
||||||
// start http server
|
// start http server
|
||||||
_, addr, err := node.StartHTTPEndpoint(endpoint, rpc.DefaultHTTPTimeouts, handler)
|
// request -> payments -> metrics -> server
|
||||||
|
_, addr, err := node.StartHTTPEndpoint(endpoint, rpc.DefaultHTTPTimeouts, paymentHandler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utils.Fatalf("Could not start RPC api: %v", err)
|
utils.Fatalf("Could not start RPC api: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user