paych: Api to reuse open channels for new payments

This commit is contained in:
Łukasz Magiera 2019-09-10 15:43:01 +02:00
parent 72a406ec7e
commit 388e3ffa96
7 changed files with 256 additions and 80 deletions

View File

@ -111,6 +111,7 @@ type FullNode interface {
PaychList(context.Context) ([]address.Address, error)
PaychStatus(context.Context, address.Address) (*PaychStatus, error)
PaychClose(context.Context, address.Address) (cid.Cid, error)
PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error)
PaychVoucherCheckValid(context.Context, address.Address, *types.SignedVoucher) error
PaychVoucherCheckSpendable(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error)
PaychVoucherCreate(context.Context, address.Address, types.BigInt, uint64) (*types.SignedVoucher, error)
@ -194,6 +195,11 @@ type PaychStatus struct {
Direction PCHDir
}
type PaymentInfo struct {
Channel address.Address
Voucher *types.SignedVoucher
}
type MinerPower struct {
MinerPower types.BigInt
TotalPower types.BigInt

View File

@ -84,6 +84,7 @@ type FullNodeStruct struct {
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
PaychStatus func(context.Context, address.Address) (*PaychStatus, error) `perm:"read"`
PaychClose func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
PaychNewPayment func(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) `perm:"sign"`
PaychVoucherCheck func(context.Context, *types.SignedVoucher) error `perm:"read"`
PaychVoucherCheckValid func(context.Context, address.Address, *types.SignedVoucher) error `perm:"read"`
PaychVoucherCheckSpendable func(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error) `perm:"read"`
@ -324,6 +325,10 @@ func (c *FullNodeStruct) PaychClose(ctx context.Context, a address.Address) (cid
return c.Internal.PaychClose(ctx, a)
}
func (c *FullNodeStruct) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) {
return c.Internal.PaychNewPayment(ctx, from, to, amount, extra, tl, minClose)
}
func (c *FullNodeStruct) PaychVoucherSubmit(ctx context.Context, ch address.Address, sv *types.SignedVoucher) (cid.Cid, error) {
return c.Internal.PaychVoucherSubmit(ctx, ch, sv)
}

View File

@ -4,8 +4,6 @@ import (
"bytes"
"fmt"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/actors/aerrors"
"github.com/filecoin-project/go-lotus/chain/address"
@ -17,7 +15,7 @@ type PaymentChannelActor struct{}
type PaymentInfo struct {
PayChActor address.Address
Payer address.Address
ChannelMessage cid.Cid
// TODO: update spec to not include channel msg
Vouchers []*types.SignedVoucher
}

View File

@ -86,26 +86,14 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
total := types.BigMul(price, types.NewInt(blocksDuration))
// TODO: at least ping the miner before creating paych / locking the money
paych, paychMsg, err := a.paychCreate(ctx, self, miner, total)
if err != nil {
return nil, err
}
head := a.Chain.GetHeaviestTipSet()
voucher := types.SignedVoucher{ // TODO: split into smaller payments
TimeLock: head.Height() + blocksDuration,
Extra: &types.ModVerifyParams{
extra := &types.ModVerifyParams{
Actor: miner,
Method: actors.MAMethods.PaymentVerifyInclusion,
Data: voucherData,
},
Lane: 0, // TODO: some api to make this easy
Amount: total,
MinCloseHeight: head.Height() + blocksDuration, // TODO: some way to start this after initial piece inclusion by actor? Using actors.PieceInclVoucherData?
}
sv, err := a.paychVoucherCreate(ctx, paych, voucher)
head := a.Chain.GetHeaviestTipSet()
payment, err := a.PaychNewPayment(ctx, self, miner, total, extra, head.Height()+blocksDuration, head.Height()+blocksDuration)
if err != nil {
return nil, err
}
@ -115,10 +103,9 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
TotalPrice: total,
Duration: blocksDuration,
Payment: actors.PaymentInfo{
PayChActor: paych,
PayChActor: payment.Channel,
Payer: self,
ChannelMessage: paychMsg,
Vouchers: []*types.SignedVoucher{sv},
Vouchers: []*types.SignedVoucher{payment.Voucher},
},
MinerAddress: miner,
ClientAddress: self,

View File

@ -26,19 +26,14 @@ type PaychAPI struct {
}
func (a *PaychAPI) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, error) {
act, _, err := a.paychCreate(ctx, from, to, amt)
return act, err
}
func (a *PaychAPI) paychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) {
params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to})
if aerr != nil {
return address.Undef, cid.Undef, aerr
return address.Undef, aerr
}
nonce, err := a.MpoolGetNonce(ctx, from)
if err != nil {
return address.Undef, cid.Undef, err
return address.Undef, err
}
enc, err := actors.SerializeParams(&actors.ExecParams{
@ -57,44 +52,108 @@ func (a *PaychAPI) paychCreate(ctx context.Context, from, to address.Address, am
GasPrice: types.NewInt(0),
}
ser, err := msg.Serialize()
smsg, err := a.WalletSignMessage(ctx, from, msg)
if err != nil {
return address.Undef, cid.Undef, err
}
sig, err := a.WalletSign(ctx, from, ser)
if err != nil {
return address.Undef, cid.Undef, err
}
smsg := &types.SignedMessage{
Message: *msg,
Signature: *sig,
return address.Address{}, err
}
if err := a.MpoolPush(ctx, smsg); err != nil {
return address.Undef, cid.Undef, err
return address.Undef, err
}
mwait, err := a.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return address.Undef, cid.Undef, err
return address.Undef, err
}
if mwait.Receipt.ExitCode != 0 {
return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
return address.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
}
paychaddr, err := address.NewFromBytes(mwait.Receipt.Return)
if err != nil {
return address.Undef, cid.Undef, err
return address.Undef, err
}
if err := a.PaychMgr.TrackOutboundChannel(ctx, paychaddr); err != nil {
return address.Undef, cid.Undef, err
return address.Undef, err
}
return paychaddr, msg.Cid(), nil
return paychaddr, nil
}
func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*api.PaymentInfo, error) {
ch, err := a.PaychMgr.OutboundChanTo(from, to)
if err != nil {
return nil, err
}
if ch == address.Undef {
// don't have matching channel, open new
// TODO: this should be more atomic
ch, err = a.PaychCreate(ctx, from, to, amount)
if err != nil {
return nil, err
}
} else {
// already have chanel to the destination, add funds, and open a new lane
// TODO: track free funds in channel
nonce, err := a.MpoolGetNonce(ctx, from)
if err != nil {
return nil, err
}
msg := &types.Message{
To: ch,
From: from,
Value: amount,
Nonce: nonce,
Method: 0,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := a.WalletSignMessage(ctx, from, msg)
if err != nil {
return nil, err
}
if err := a.MpoolPush(ctx, smsg); err != nil {
return nil, err
}
mwait, err := a.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return nil, err
}
if mwait.Receipt.ExitCode != 0 {
return nil, fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
}
}
lane, err := a.PaychMgr.AllocateLane(ch)
if err != nil {
return nil, err
}
sv, err := a.paychVoucherCreate(ctx, ch, types.SignedVoucher{
Amount: amount,
Lane: lane,
Extra: extra,
TimeLock: tl,
MinCloseHeight: minClose,
})
if err != nil {
return nil, err
}
return &api.PaymentInfo{
Channel: ch,
Voucher: sv,
}, nil
}
func (a *PaychAPI) PaychList(ctx context.Context) ([]address.Address, error) {
@ -107,7 +166,7 @@ func (a *PaychAPI) PaychStatus(ctx context.Context, pch address.Address) (*api.P
return nil, err
}
return &api.PaychStatus{
ControlAddr: ci.ControlAddr,
ControlAddr: ci.Control,
Direction: api.PCHDir(ci.Direction),
}, nil
}
@ -118,14 +177,14 @@ func (a *PaychAPI) PaychClose(ctx context.Context, addr address.Address) (cid.Ci
return cid.Undef, err
}
nonce, err := a.MpoolGetNonce(ctx, ci.ControlAddr)
nonce, err := a.MpoolGetNonce(ctx, ci.Control)
if err != nil {
return cid.Undef, err
}
msg := &types.Message{
To: addr,
From: ci.ControlAddr,
From: ci.Control,
Value: types.NewInt(0),
Method: actors.PCAMethods.Close,
Nonce: nonce,
@ -134,7 +193,7 @@ func (a *PaychAPI) PaychClose(ctx context.Context, addr address.Address) (cid.Ci
GasPrice: types.NewInt(0),
}
smsg, err := a.WalletSignMessage(ctx, ci.ControlAddr, msg)
smsg, err := a.WalletSignMessage(ctx, ci.Control, msg)
if err != nil {
return cid.Undef, err
}
@ -192,7 +251,7 @@ func (a *PaychAPI) paychVoucherCreate(ctx context.Context, pch address.Address,
return nil, err
}
sig, err := a.WalletSign(ctx, ci.ControlAddr, vb)
sig, err := a.WalletSign(ctx, ci.Control, vb)
if err != nil {
return nil, err
}
@ -226,7 +285,7 @@ func (a *PaychAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, s
return cid.Undef, err
}
nonce, err := a.MpoolGetNonce(ctx, ci.ControlAddr)
nonce, err := a.MpoolGetNonce(ctx, ci.Control)
if err != nil {
return cid.Undef, err
}
@ -243,7 +302,7 @@ func (a *PaychAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, s
}
msg := &types.Message{
From: ci.ControlAddr,
From: ci.Control,
To: ch,
Value: types.NewInt(0),
Nonce: nonce,
@ -253,7 +312,7 @@ func (a *PaychAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, s
GasPrice: types.NewInt(0),
}
smsg, err := a.WalletSignMessage(ctx, ci.ControlAddr, msg)
smsg, err := a.WalletSignMessage(ctx, ci.Control, msg)
if err != nil {
return cid.Undef, err
}

View File

@ -3,6 +3,8 @@ package paych
import (
"context"
"fmt"
"math"
"strconv"
logging "github.com/ipfs/go-log"
@ -26,16 +28,38 @@ func NewManager(sm *stmgr.StateManager, pchstore *Store) *Manager {
}
}
func maxLaneFromState(st *actors.PaymentChannelActorState) (uint64, error) {
maxLane := uint64(math.MaxUint64)
for lane := range st.LaneStates {
ilane, err := strconv.ParseUint(lane, 10, 64)
if err != nil {
return 0, err
}
if ilane+1 > maxLane+1 {
maxLane = ilane
}
}
return maxLane, nil
}
func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error {
_, st, err := pm.loadPaychState(ctx, ch)
if err != nil {
return err
}
maxLane, err := maxLaneFromState(st)
if err != nil {
return err
}
return pm.store.TrackChannel(&ChannelInfo{
Channel: ch,
Control: st.To,
Target: st.From,
Direction: DirInbound,
ControlAddr: st.To,
NextLane: maxLane + 1,
})
}
@ -45,10 +69,18 @@ func (pm *Manager) TrackOutboundChannel(ctx context.Context, ch address.Address)
return err
}
maxLane, err := maxLaneFromState(st)
if err != nil {
return err
}
return pm.store.TrackChannel(&ChannelInfo{
Channel: ch,
Control: st.From,
Target: st.To,
Direction: DirOutbound,
ControlAddr: st.From,
NextLane: maxLane + 1,
})
}
@ -197,12 +229,25 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *types
return pm.store.AddVoucher(ch, sv, proof)
}
func (pm *Manager) AllocateLane(ch address.Address) (uint64, error) {
return pm.store.AllocateLane(ch)
}
func (pm *Manager) ListVouchers(ctx context.Context, ch address.Address) ([]*VoucherInfo, error) {
// TODO: just having a passthrough method like this feels odd. Seems like
// there should be some filtering we're doing here
return pm.store.VouchersForPaych(ch)
}
func (pm *Manager) OutboundChanTo(from, to address.Address) (address.Address, error) {
return pm.store.findChan(func(ci *ChannelInfo) bool {
if ci.Direction != DirOutbound {
return false
}
return ci.Control == from && ci.Target == to
})
}
func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lane uint64) (uint64, error) {
vouchers, err := pm.store.VouchersForPaych(ch)
if err != nil {

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
@ -24,6 +25,8 @@ func init() {
}
type Store struct {
lk sync.Mutex // TODO: this can be split per paych
ds datastore.Batching
}
@ -46,9 +49,12 @@ type VoucherInfo struct {
type ChannelInfo struct {
Channel address.Address
ControlAddr address.Address
Control address.Address
Target address.Address
Direction int
Vouchers []*VoucherInfo
NextLane uint64
}
func dskeyForChannel(addr address.Address) datastore.Key {
@ -86,6 +92,9 @@ func (ps *Store) getChannelInfo(addr address.Address) (*ChannelInfo, error) {
}
func (ps *Store) TrackChannel(ch *ChannelInfo) error {
ps.lk.Lock()
defer ps.lk.Unlock()
_, err := ps.getChannelInfo(ch.Channel)
switch err {
default:
@ -98,10 +107,14 @@ func (ps *Store) TrackChannel(ch *ChannelInfo) error {
}
func (ps *Store) ListChannels() ([]address.Address, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
res, err := ps.ds.Query(dsq.Query{KeysOnly: true})
if err != nil {
return nil, err
}
defer res.Close()
var out []address.Address
for {
@ -125,7 +138,51 @@ func (ps *Store) ListChannels() ([]address.Address, error) {
return out, nil
}
func (ps *Store) findChan(filter func(*ChannelInfo) bool) (address.Address, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
res, err := ps.ds.Query(dsq.Query{})
if err != nil {
return address.Undef, err
}
defer res.Close()
var ci ChannelInfo
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
return address.Undef, err
}
if err := cbor.DecodeInto(res.Value, &ci); err != nil {
return address.Undef, err
}
if !filter(&ci) {
continue
}
addr, err := address.NewFromString(strings.TrimPrefix(res.Key, "/"))
if err != nil {
return address.Undef, xerrors.Errorf("failed reading paych key (%q) from datastore: %w", res.Key, err)
}
return addr, nil
}
return address.Undef, nil
}
func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof []byte) error {
ps.lk.Lock()
defer ps.lk.Unlock()
ci, err := ps.getChannelInfo(ch)
if err != nil {
return err
@ -159,9 +216,28 @@ func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof [
Proof: proof,
})
if ci.NextLane <= sv.Lane {
ci.NextLane = sv.Lane + 1
}
return ps.putChannelInfo(ci)
}
func (ps *Store) AllocateLane(ch address.Address) (uint64, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
ci, err := ps.getChannelInfo(ch)
if err != nil {
return 0, err
}
out := ci.NextLane
ci.NextLane++
return out, ps.putChannelInfo(ci)
}
func (ps *Store) VouchersForPaych(ch address.Address) ([]*VoucherInfo, error) {
ci, err := ps.getChannelInfo(ch)
if err != nil {