deals: more atomic voucher handling

This commit is contained in:
Łukasz Magiera 2019-09-25 14:56:04 +02:00
parent 72bb5c6919
commit 2fe4ffdfdc
9 changed files with 132 additions and 150 deletions

View File

@ -9,7 +9,6 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
@ -123,7 +122,6 @@ type FullNode interface {
PaychStatus(context.Context, address.Address) (*PaychStatus, error)
PaychClose(context.Context, address.Address) (cid.Cid, error)
PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error)
PaychLaneState(ctx context.Context, ch address.Address, lane uint64) (actors.LaneState, error)
PaychNewPayment(ctx context.Context, from, to address.Address, vouchers []VoucherSpec) (*PaymentInfo, error)
PaychVoucherCheckValid(context.Context, address.Address, *types.SignedVoucher) error
PaychVoucherCheckSpendable(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error)

View File

@ -8,7 +8,6 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
@ -92,7 +91,6 @@ type FullNodeStruct struct {
PaychClose func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
PaychAllocateLane func(context.Context, address.Address) (uint64, error) `perm:"sign"`
PaychNewPayment func(ctx context.Context, from, to address.Address, vouchers []VoucherSpec) (*PaymentInfo, error) `perm:"sign"`
PaychLaneState func(ctx context.Context, ch address.Address, lane uint64) (actors.LaneState, error) `perm:"write"`
PaychVoucherCheck func(context.Context, *types.SignedVoucher) error `perm:"read"`
PaychVoucherCheckValid func(context.Context, address.Address, *types.SignedVoucher) error `perm:"read"`
PaychVoucherCheckSpendable func(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error) `perm:"read"`
@ -368,10 +366,6 @@ func (c *FullNodeStruct) PaychNewPayment(ctx context.Context, from, to address.A
return c.Internal.PaychNewPayment(ctx, from, to, vouchers)
}
func (c *FullNodeStruct) PaychLaneState(ctx context.Context, ch address.Address, lane uint64) (actors.LaneState, error) {
return c.Internal.PaychLaneState(ctx, ch, lane)
}
func (c *FullNodeStruct) PaychVoucherSubmit(ctx context.Context, ch address.Address, sv *types.SignedVoucher) (cid.Cid, error) {
return c.Internal.PaychVoucherSubmit(ctx, ch, sv)
}

View File

@ -37,7 +37,7 @@ type PaymentChannelActorState struct {
ClosingAt uint64
MinCloseHeight uint64
// TODO: needs to be map[uint64]*LaneState
// TODO: needs to be map[uint64]*laneState
// waiting on refmt#35 to be fixed
LaneStates map[string]*LaneState
}

View File

@ -1894,7 +1894,7 @@ func (t *PaymentChannelActorState) MarshalCBOR(w io.Writer) error {
return err
}
// t.t.LaneStates (map[string]*actors.LaneState)
// t.t.LaneStates (map[string]*actors.laneState)
if err := cbg.CborWriteHeader(w, cbg.MajMap, uint64(len(t.LaneStates))); err != nil {
return err
}
@ -1978,7 +1978,7 @@ func (t *PaymentChannelActorState) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("wrong type for uint64 field")
}
t.MinCloseHeight = extra
// t.t.LaneStates (map[string]*actors.LaneState)
// t.t.LaneStates (map[string]*actors.laneState)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {

View File

@ -13,7 +13,6 @@ import (
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
@ -43,7 +42,61 @@ func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFun
// ACCEPTED
func (h *Handler) validateVouchers(ctx context.Context, deal MinerDeal, paych address.Address) error {
func (h *Handler) checkVoucher(ctx context.Context, deal MinerDeal, voucher *types.SignedVoucher, lane uint64, maxClose uint64, amount types.BigInt) error {
err := h.full.PaychVoucherCheckValid(ctx, deal.Proposal.Payment.PayChActor, voucher)
if err != nil {
return err
}
if voucher.Extra == nil {
return xerrors.New("voucher.Extra not set")
}
if voucher.Extra.Actor != deal.Proposal.MinerAddress {
return xerrors.Errorf("extra params actor didn't match miner address in proposal: '%s' != '%s'", voucher.Extra.Actor, deal.Proposal.MinerAddress)
}
if voucher.Extra.Method != actors.MAMethods.PaymentVerifyInclusion {
return xerrors.Errorf("expected extra method %d, got %d", actors.MAMethods.PaymentVerifyInclusion, voucher.Extra.Method)
}
var inclChallenge actors.PieceInclVoucherData
if err := cbor.DecodeInto(voucher.Extra.Data, &inclChallenge); err != nil {
return xerrors.Errorf("failed to decode storage voucher data for verification: %w", err)
}
if inclChallenge.PieceSize.Uint64() != deal.Proposal.Size {
return xerrors.Errorf("paych challenge piece size didn't match deal proposal size: %d != %d", inclChallenge.PieceSize.Uint64(), deal.Proposal.Size)
}
if !bytes.Equal(inclChallenge.CommP, deal.Proposal.CommP) {
return xerrors.New("paych challenge commP didn't match deal proposal")
}
if voucher.MinCloseHeight > maxClose {
return xerrors.Errorf("MinCloseHeight too high (%d), max expected: %d", voucher.MinCloseHeight, maxClose)
}
if voucher.TimeLock > maxClose {
return xerrors.Errorf("TimeLock too high (%d), max expected: %d", voucher.TimeLock, maxClose)
}
if len(voucher.Merges) > 0 {
return xerrors.New("didn't expect any merges")
}
if voucher.Amount.LessThan(amount) {
return xerrors.Errorf("not enough funds in the voucher: %s < %s; vl=%d", voucher.Amount, amount, len(deal.Proposal.Payment.Vouchers))
}
if voucher.Lane != lane {
return xerrors.Errorf("expected all vouchers on lane %d, found voucher on lane %d", lane, voucher.Lane)
}
return nil
}
func (h *Handler) consumeVouchers(ctx context.Context, deal MinerDeal) error {
curHead, err := h.full.ChainHead(ctx)
if err != nil {
return err
@ -64,47 +117,10 @@ func (h *Handler) validateVouchers(ctx context.Context, deal MinerDeal, paych ad
lane := deal.Proposal.Payment.Vouchers[0].Lane
for i, voucher := range deal.Proposal.Payment.Vouchers {
err := h.full.PaychVoucherCheckValid(ctx, deal.Proposal.Payment.PayChActor, voucher)
if err != nil {
return xerrors.Errorf("validating payment voucher %d: %w", i, err)
}
if voucher.Extra == nil {
return xerrors.Errorf("validating payment voucher %d: voucher.Extra not set")
}
if voucher.Extra.Actor != deal.Proposal.MinerAddress {
return xerrors.Errorf("validating payment voucher %d: extra params actor didn't match miner address in proposal: '%s' != '%s'", i, voucher.Extra.Actor, deal.Proposal.MinerAddress)
}
if voucher.Extra.Method != actors.MAMethods.PaymentVerifyInclusion {
return xerrors.Errorf("validating payment voucher %d: expected extra method %d, got %d", i, actors.MAMethods.PaymentVerifyInclusion, voucher.Extra.Method)
}
var inclChallenge actors.PieceInclVoucherData
if err := cbor.DecodeInto(voucher.Extra.Data, &inclChallenge); err != nil {
return xerrors.Errorf("validating payment voucher %d: failed to decode storage voucher data for verification: %w", i, err)
}
if inclChallenge.PieceSize.Uint64() != deal.Proposal.Size {
return xerrors.Errorf("validating payment voucher %d: paych challenge piece size didn't match deal proposal size: %d != %d", i, inclChallenge.PieceSize.Uint64(), deal.Proposal.Size)
}
if !bytes.Equal(inclChallenge.CommP, deal.Proposal.CommP) {
return xerrors.Errorf("validating payment voucher %d: paych challenge commP didn't match deal proposal", i)
}
maxClose := curHead.Height() + (increments * uint64(i+1)) + build.DealVoucherSkewLimit
if voucher.MinCloseHeight > maxClose {
return xerrors.Errorf("validating payment voucher %d: MinCloseHeight too high (%d), max expected: %d", i, voucher.MinCloseHeight, maxClose)
}
if voucher.TimeLock > maxClose {
return xerrors.Errorf("validating payment voucher %d: TimeLock too high (%d), max expected: %d", i, voucher.TimeLock, maxClose)
}
if len(voucher.Merges) > 0 {
return xerrors.Errorf("validating payment voucher %d: didn't expect any merges", i)
}
if voucher.Amount.LessThan(vspec[i].Amount) {
return xerrors.Errorf("validating payment voucher %d: not enough funds in the voucher: %s < %s; vl=%d vsl=%d", i, voucher.Amount, vspec[i].Amount, len(deal.Proposal.Payment.Vouchers), len(vspec))
if err := h.checkVoucher(ctx, deal, voucher, lane, maxClose, vspec[i].Amount); err != nil {
return xerrors.Errorf("validating payment voucher %d: %w", i, err)
}
}
@ -113,23 +129,14 @@ func (h *Handler) validateVouchers(ctx context.Context, deal MinerDeal, paych ad
return xerrors.Errorf("minimum price: %s", minPrice)
}
// TODO: This is mildly racy, we need a way to check lane and submit voucher
// atomically
laneState, err := h.full.PaychLaneState(ctx, paych, lane)
if err != nil {
return xerrors.Errorf("looking up payment channel lane: %w", err)
}
prevAmt := types.NewInt(0)
if laneState.Closed {
return xerrors.New("lane closed")
}
if laneState.Redeemed.GreaterThan(types.NewInt(0)) {
return xerrors.New("used lanes unsupported: lane redeemed amount was non-zero")
}
if laneState.Nonce > 0 {
return xerrors.New("used lanes unsupported: nonce higher than zero")
for i, voucher := range deal.Proposal.Payment.Vouchers {
delta, err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, voucher, nil, types.BigSub(vspec[i].Amount, prevAmt))
if err != nil {
return xerrors.Errorf("consuming payment voucher %d: %w", i, err)
}
prevAmt = types.BigAdd(prevAmt, delta)
}
return nil
@ -151,17 +158,10 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
}
}
if err := h.validateVouchers(ctx, deal, deal.Proposal.Payment.PayChActor); err != nil {
if err := h.consumeVouchers(ctx, deal); err != nil {
return nil, err
}
for i, voucher := range deal.Proposal.Payment.Vouchers {
// TODO: Set correct minAmount
if _, err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, voucher, nil, types.NewInt(0)); err != nil {
return nil, xerrors.Errorf("consuming payment voucher %d: %w", i, err)
}
}
log.Info("fetching data for a deal")
err := h.sendSignedResponse(StorageDealResponse{
State: api.DealAccepted,

View File

@ -136,10 +136,6 @@ func (a *PaychAPI) PaychClose(ctx context.Context, addr address.Address) (cid.Ci
return smsg.Cid(), nil
}
func (a *PaychAPI) PaychLaneState(ctx context.Context, ch address.Address, lane uint64) (actors.LaneState, error) {
return a.PaychMgr.LaneState(ctx, ch, lane)
}
func (a *PaychAPI) PaychVoucherCheckValid(ctx context.Context, ch address.Address, sv *types.SignedVoucher) error {
return a.PaychMgr.CheckVoucherValid(ctx, ch, sv)
}
@ -151,10 +147,6 @@ func (a *PaychAPI) PaychVoucherCheckSpendable(ctx context.Context, ch address.Ad
func (a *PaychAPI) PaychVoucherAdd(ctx context.Context, ch address.Address, sv *types.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) {
_ = a.PaychMgr.TrackInboundChannel(ctx, ch) // TODO: expose those calls
if err := a.PaychVoucherCheckValid(ctx, ch, sv); err != nil {
return types.NewInt(0), err
}
return a.PaychMgr.AddVoucher(ctx, ch, sv, proof, minDelta)
}

View File

@ -1,8 +1,10 @@
package paych
import (
"bytes"
"context"
"fmt"
"golang.org/x/xerrors"
"math"
"strconv"
@ -243,7 +245,65 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *types
return types.NewInt(0), err
}
return pm.store.AddVoucher(ch, sv, proof, minDelta)
pm.store.lk.Lock()
defer pm.store.lk.Unlock()
ci, err := pm.store.getChannelInfo(ch)
if err != nil {
return types.NewInt(0), err
}
laneState, err := pm.laneState(ctx, ch, sv.Lane)
if err != nil {
return types.NewInt(0), err
}
if laneState.Closed {
return types.NewInt(0), xerrors.New("lane closed")
}
if laneState.Nonce > sv.Nonce {
return types.NewInt(0), xerrors.Errorf("already storing voucher with higher nonce; %d > %d", laneState.Nonce, sv.Nonce)
}
// look for duplicates
for i, v := range ci.Vouchers {
if !sv.Equals(v.Voucher) {
continue
}
if v.Proof != nil {
if !bytes.Equal(v.Proof, proof) {
log.Warnf("AddVoucher: multiple proofs for single voucher, storing both")
break
}
log.Warnf("AddVoucher: voucher re-added with matching proof")
return types.NewInt(0), nil
}
log.Warnf("AddVoucher: adding proof to stored voucher")
ci.Vouchers[i] = &VoucherInfo{
Voucher: v.Voucher,
Proof: proof,
}
return types.NewInt(0), pm.store.putChannelInfo(ci)
}
delta := types.BigSub(sv.Amount, laneState.Redeemed)
if minDelta.GreaterThan(delta) {
return delta, xerrors.Errorf("addVoucher: supplied token amount too low; minD=%s, D=%s; laneAmt=%s; v.Amt=%s", minDelta, delta, laneState.Redeemed, sv.Amount)
}
ci.Vouchers = append(ci.Vouchers, &VoucherInfo{
Voucher: sv,
Proof: proof,
})
if ci.NextLane <= sv.Lane {
ci.NextLane = sv.Lane + 1
}
return delta, pm.store.putChannelInfo(ci)
}
func (pm *Manager) AllocateLane(ch address.Address) (uint64, error) {

View File

@ -19,7 +19,7 @@ func (pm *Manager) loadPaychState(ctx context.Context, ch address.Address) (*typ
return act, &pcast, nil
}
func (pm *Manager) LaneState(ctx context.Context, ch address.Address, lane uint64) (actors.LaneState, error) {
func (pm *Manager) laneState(ctx context.Context, ch address.Address, lane uint64) (actors.LaneState, error) {
_, state, err := pm.loadPaychState(ctx, ch)
if err != nil {
return actors.LaneState{}, err
@ -39,7 +39,7 @@ func (pm *Manager) LaneState(ctx context.Context, ch address.Address, lane uint6
}
if ls.Closed {
return actors.LaneState{}, nil
return *ls, nil
}
vouchers, err := pm.store.VouchersForPaych(ch)
@ -52,7 +52,7 @@ func (pm *Manager) LaneState(ctx context.Context, ch address.Address, lane uint6
for _, v := range vouchers {
for range v.Voucher.Merges {
panic("merges todo")
panic("merges todo") // TODO: nonce check
}
if v.Voucher.Lane != lane {

View File

@ -1,10 +1,8 @@
package paych
import (
"bytes"
"errors"
"fmt"
"math"
"strings"
"sync"
@ -182,66 +180,6 @@ func (ps *Store) findChan(filter func(*ChannelInfo) bool) (address.Address, erro
return address.Undef, nil
}
func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
ci, err := ps.getChannelInfo(ch)
if err != nil {
return types.NewInt(0), err
}
var bestAmount types.BigInt
var bestNonce uint64 = math.MaxUint64
// look for duplicates
for i, v := range ci.Vouchers {
if v.Voucher.Lane == sv.Lane && v.Voucher.Nonce+1 > bestNonce+1 {
bestNonce = v.Voucher.Nonce
bestAmount = v.Voucher.Amount
}
if !sv.Equals(v.Voucher) {
continue
}
if v.Proof != nil {
if !bytes.Equal(v.Proof, proof) {
log.Warnf("AddVoucher: multiple proofs for single voucher, storing both")
break
}
log.Warnf("AddVoucher: voucher re-added with matching proof")
return types.NewInt(0), nil
}
log.Warnf("AddVoucher: adding proof to stored voucher")
ci.Vouchers[i] = &VoucherInfo{
Voucher: v.Voucher,
Proof: proof,
}
return types.NewInt(0), ps.putChannelInfo(ci)
}
if bestAmount == (types.BigInt{}) {
bestAmount = types.NewInt(0)
}
delta := types.BigSub(sv.Amount, bestAmount)
if minDelta.GreaterThan(delta) {
return delta, xerrors.Errorf("addVoucher: supplied token amount too low; minD=%s, D=%s; bestAmt=%s; v.Amt=%s", minDelta, delta, bestAmount, sv.Amount)
}
ci.Vouchers = append(ci.Vouchers, &VoucherInfo{
Voucher: sv,
Proof: proof,
})
if ci.NextLane <= sv.Lane {
ci.NextLane = sv.Lane + 1
}
return delta, ps.putChannelInfo(ci)
}
func (ps *Store) AllocateLane(ch address.Address) (uint64, error) {
ps.lk.Lock()
defer ps.lk.Unlock()