Merge pull request #2448 from filecoin-project/feat/payment-channel-settler
Submit vouchers automatically when payment channels settle
This commit is contained in:
commit
7b00bb1ffe
@ -367,7 +367,8 @@ type FullNode interface {
|
||||
PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
|
||||
PaychList(context.Context) ([]address.Address, error)
|
||||
PaychStatus(context.Context, address.Address) (*PaychStatus, error)
|
||||
PaychClose(context.Context, address.Address) (cid.Cid, error)
|
||||
PaychSettle(context.Context, address.Address) (cid.Cid, error)
|
||||
PaychCollect(context.Context, address.Address) (cid.Cid, error)
|
||||
PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error)
|
||||
PaychNewPayment(ctx context.Context, from, to address.Address, vouchers []VoucherSpec) (*PaymentInfo, error)
|
||||
PaychVoucherCheckValid(context.Context, address.Address, *paych.SignedVoucher) error
|
||||
|
@ -180,7 +180,8 @@ type FullNodeStruct struct {
|
||||
PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
|
||||
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
|
||||
PaychStatus func(context.Context, address.Address) (*api.PaychStatus, error) `perm:"read"`
|
||||
PaychClose func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
|
||||
PaychSettle func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
|
||||
PaychCollect func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
|
||||
PaychAllocateLane func(context.Context, address.Address) (uint64, error) `perm:"sign"`
|
||||
PaychNewPayment func(ctx context.Context, from, to address.Address, vouchers []api.VoucherSpec) (*api.PaymentInfo, error) `perm:"sign"`
|
||||
PaychVoucherCheck func(context.Context, *paych.SignedVoucher) error `perm:"read"`
|
||||
@ -805,8 +806,12 @@ func (c *FullNodeStruct) PaychVoucherList(ctx context.Context, pch address.Addre
|
||||
return c.Internal.PaychVoucherList(ctx, pch)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) PaychClose(ctx context.Context, a address.Address) (cid.Cid, error) {
|
||||
return c.Internal.PaychClose(ctx, a)
|
||||
func (c *FullNodeStruct) PaychSettle(ctx context.Context, a address.Address) (cid.Cid, error) {
|
||||
return c.Internal.PaychSettle(ctx, a)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) PaychCollect(ctx context.Context, a address.Address) (cid.Cid, error) {
|
||||
return c.Internal.PaychCollect(ctx, a)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) PaychAllocateLane(ctx context.Context, ch address.Address) (uint64, error) {
|
||||
|
286
api/test/paych.go
Normal file
286
api/test/paych.go
Normal file
@ -0,0 +1,286 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/events"
|
||||
"github.com/filecoin-project/lotus/chain/events/state"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||
initactor "github.com/filecoin-project/specs-actors/actors/builtin/init"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
|
||||
)
|
||||
|
||||
func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
|
||||
_ = os.Setenv("BELLMAN_NO_GPU", "1")
|
||||
|
||||
ctx := context.Background()
|
||||
n, sn := b(t, 2, oneMiner)
|
||||
|
||||
paymentCreator := n[0]
|
||||
paymentReceiver := n[1]
|
||||
miner := sn[0]
|
||||
|
||||
// get everyone connected
|
||||
addrs, err := paymentCreator.NetAddrsListen(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := paymentReceiver.NetConnect(ctx, addrs); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := miner.NetConnect(ctx, addrs); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// start mining blocks
|
||||
bm := newBlockMiner(ctx, t, miner, blocktime)
|
||||
bm.mineBlocks()
|
||||
|
||||
// send some funds to register the receiver
|
||||
receiverAddr, err := paymentReceiver.WalletNew(ctx, wallet.ActSigType("secp256k1"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
sendFunds(ctx, t, paymentCreator, receiverAddr, abi.NewTokenAmount(1e10))
|
||||
|
||||
// setup the payment channel
|
||||
createrAddr, err := paymentCreator.WalletDefaultAddress(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
channelAmt := int64(100000)
|
||||
channelInfo, err := paymentCreator.PaychGet(ctx, createrAddr, receiverAddr, abi.NewTokenAmount(channelAmt))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
res := waitForMessage(ctx, t, paymentCreator, channelInfo.ChannelMessage, time.Second, "channel create")
|
||||
var params initactor.ExecReturn
|
||||
err = params.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
channel := params.RobustAddress
|
||||
|
||||
// allocate three lanes
|
||||
var lanes []uint64
|
||||
for i := 0; i < 3; i++ {
|
||||
lane, err := paymentCreator.PaychAllocateLane(ctx, channel)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
lanes = append(lanes, lane)
|
||||
}
|
||||
|
||||
// Make two vouchers each for each lane, then save on the other side
|
||||
// Note that the voucher with a value of 2000 has a higher nonce, so it
|
||||
// supersedes the voucher with a value of 1000
|
||||
for _, lane := range lanes {
|
||||
vouch1, err := paymentCreator.PaychVoucherCreate(ctx, channel, abi.NewTokenAmount(1000), lane)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
vouch2, err := paymentCreator.PaychVoucherCreate(ctx, channel, abi.NewTokenAmount(2000), lane)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
delta1, err := paymentReceiver.PaychVoucherAdd(ctx, channel, vouch1, nil, abi.NewTokenAmount(1000))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !delta1.Equals(abi.NewTokenAmount(1000)) {
|
||||
t.Fatal("voucher didn't have the right amount")
|
||||
}
|
||||
delta2, err := paymentReceiver.PaychVoucherAdd(ctx, channel, vouch2, nil, abi.NewTokenAmount(1000))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !delta2.Equals(abi.NewTokenAmount(1000)) {
|
||||
t.Fatal("voucher didn't have the right amount")
|
||||
}
|
||||
}
|
||||
|
||||
// settle the payment channel
|
||||
settleMsgCid, err := paymentCreator.PaychSettle(ctx, channel)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
res = waitForMessage(ctx, t, paymentCreator, settleMsgCid, time.Second*10, "settle")
|
||||
if res.Receipt.ExitCode != 0 {
|
||||
t.Fatal("Unable to settle payment channel")
|
||||
}
|
||||
|
||||
creatorPreCollectBalance, err := paymentCreator.WalletBalance(ctx, createrAddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// wait for the receiver to submit their vouchers
|
||||
ev := events.NewEvents(ctx, paymentCreator)
|
||||
preds := state.NewStatePredicates(paymentCreator)
|
||||
finished := make(chan struct{})
|
||||
err = ev.StateChanged(func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||
act, err := paymentCreator.StateReadState(ctx, channel, ts.Key())
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
state := act.State.(paych.State)
|
||||
if state.ToSend.GreaterThanEqual(abi.NewTokenAmount(6000)) {
|
||||
return true, false, nil
|
||||
}
|
||||
return false, true, nil
|
||||
}, func(oldTs, newTs *types.TipSet, states events.StateChange, curH abi.ChainEpoch) (more bool, err error) {
|
||||
toSendChange := states.(*state.PayChToSendChange)
|
||||
if toSendChange.NewToSend.GreaterThanEqual(abi.NewTokenAmount(6000)) {
|
||||
close(finished)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}, func(ctx context.Context, ts *types.TipSet) error {
|
||||
return nil
|
||||
}, int(build.MessageConfidence)+1, build.SealRandomnessLookbackLimit, func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
|
||||
return preds.OnPaymentChannelActorChanged(channel, preds.OnToSendAmountChanges())(ctx, oldTs.Key(), newTs.Key())
|
||||
})
|
||||
|
||||
select {
|
||||
case <-finished:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Timed out waiting for receiver to submit vouchers")
|
||||
}
|
||||
|
||||
// collect funds (from receiver, though either party can do it)
|
||||
collectMsg, err := paymentReceiver.PaychCollect(ctx, channel)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
res, err = paymentReceiver.StateWaitMsg(ctx, collectMsg, 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if res.Receipt.ExitCode != 0 {
|
||||
t.Fatal("unable to collect on payment channel")
|
||||
}
|
||||
|
||||
// Finally, check the balance for the creator
|
||||
currentCreatorBalance, err := paymentCreator.WalletBalance(ctx, createrAddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// The highest nonce voucher that the creator sent on each lane is 2000
|
||||
totalVouchers := int64(len(lanes) * 2000)
|
||||
// When receiver submits the tokens to the chain, creator should get a
|
||||
// refund on the remaining balance, which is
|
||||
// channel amount - total voucher value
|
||||
expectedRefund := channelAmt - totalVouchers
|
||||
delta := big.Sub(currentCreatorBalance, creatorPreCollectBalance)
|
||||
if !delta.Equals(abi.NewTokenAmount(expectedRefund)) {
|
||||
t.Fatalf("did not send correct funds from creator: expected %d, got %d", expectedRefund, delta)
|
||||
}
|
||||
|
||||
// shut down mining
|
||||
bm.stop()
|
||||
}
|
||||
|
||||
func waitForMessage(ctx context.Context, t *testing.T, paymentCreator TestNode, msgCid cid.Cid, duration time.Duration, desc string) *api.MsgLookup {
|
||||
ctx, cancel := context.WithTimeout(ctx, duration)
|
||||
defer cancel()
|
||||
|
||||
fmt.Println("Waiting for", desc)
|
||||
res, err := paymentCreator.StateWaitMsg(ctx, msgCid, 1)
|
||||
if err != nil {
|
||||
fmt.Println("Error waiting for", desc, err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
if res.Receipt.ExitCode != 0 {
|
||||
t.Fatalf("did not successfully send %s", desc)
|
||||
}
|
||||
fmt.Println("Confirmed", desc)
|
||||
return res
|
||||
}
|
||||
|
||||
type blockMiner struct {
|
||||
ctx context.Context
|
||||
t *testing.T
|
||||
miner TestStorageNode
|
||||
blocktime time.Duration
|
||||
mine int64
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func newBlockMiner(ctx context.Context, t *testing.T, miner TestStorageNode, blocktime time.Duration) *blockMiner {
|
||||
return &blockMiner{
|
||||
ctx: ctx,
|
||||
t: t,
|
||||
miner: miner,
|
||||
blocktime: blocktime,
|
||||
mine: int64(1),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (bm *blockMiner) mineBlocks() {
|
||||
time.Sleep(time.Second)
|
||||
go func() {
|
||||
defer close(bm.done)
|
||||
for atomic.LoadInt64(&bm.mine) == 1 {
|
||||
time.Sleep(bm.blocktime)
|
||||
if err := bm.miner.MineOne(bm.ctx, func(bool, error) {}); err != nil {
|
||||
bm.t.Error(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (bm *blockMiner) stop() {
|
||||
atomic.AddInt64(&bm.mine, -1)
|
||||
fmt.Println("shutting down mining")
|
||||
<-bm.done
|
||||
}
|
||||
|
||||
func sendFunds(ctx context.Context, t *testing.T, sender TestNode, addr address.Address, amount abi.TokenAmount) {
|
||||
|
||||
senderAddr, err := sender.WalletDefaultAddress(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
From: senderAddr,
|
||||
To: addr,
|
||||
Value: amount,
|
||||
GasLimit: 0,
|
||||
GasPrice: abi.NewTokenAmount(0),
|
||||
}
|
||||
|
||||
sm, err := sender.MpoolPushMessage(ctx, msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
res, err := sender.StateWaitMsg(ctx, sm.Cid(), 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if res.Receipt.ExitCode != 0 {
|
||||
t.Fatal("did not successfully send money")
|
||||
}
|
||||
}
|
@ -3,6 +3,7 @@ package state
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
@ -12,6 +13,7 @@ import (
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
|
||||
"github.com/filecoin-project/lotus/api/apibstore"
|
||||
@ -493,3 +495,40 @@ func (sp *StatePredicates) OnMinerPreCommitChange() DiffMinerActorStateFunc {
|
||||
return true, precommitChanges, nil
|
||||
}
|
||||
}
|
||||
|
||||
// DiffPaymentChannelStateFunc is function that compares two states for the payment channel
|
||||
type DiffPaymentChannelStateFunc func(ctx context.Context, oldState *paych.State, newState *paych.State) (changed bool, user UserData, err error)
|
||||
|
||||
// OnPaymentChannelActorChanged calls diffPaymentChannelState when the state changes for the the payment channel actor
|
||||
func (sp *StatePredicates) OnPaymentChannelActorChanged(paychAddr address.Address, diffPaymentChannelState DiffPaymentChannelStateFunc) DiffTipSetKeyFunc {
|
||||
return sp.OnActorStateChanged(paychAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
|
||||
var oldState paych.State
|
||||
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
var newState paych.State
|
||||
if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
return diffPaymentChannelState(ctx, &oldState, &newState)
|
||||
})
|
||||
}
|
||||
|
||||
// PayChToSendChange is a difference in the amount to send on a payment channel when the money is collected
|
||||
type PayChToSendChange struct {
|
||||
OldToSend abi.TokenAmount
|
||||
NewToSend abi.TokenAmount
|
||||
}
|
||||
|
||||
// OnToSendAmountChanges monitors changes on the total amount to send from one party to the other on a payment channel
|
||||
func (sp *StatePredicates) OnToSendAmountChanges() DiffPaymentChannelStateFunc {
|
||||
return func(ctx context.Context, oldState *paych.State, newState *paych.State) (changed bool, user UserData, err error) {
|
||||
if oldState.ToSend.Equals(newState.ToSend) {
|
||||
return false, nil, nil
|
||||
}
|
||||
return true, &PayChToSendChange{
|
||||
OldToSend: oldState.ToSend,
|
||||
NewToSend: newState.ToSend,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
@ -4,15 +4,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/filecoin-project/go-amt-ipld/v2"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/account"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/multisig"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
@ -21,6 +12,15 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-amt-ipld/v2"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/account"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/multisig"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/state"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
@ -301,7 +301,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot ci
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
vm, err := vm.NewVM(stateroot, 0, &fakeRand{}, cs.Blockstore(), &fakedSigSyscalls{cs.VMSys()})
|
||||
vm, err := vm.NewVM(stateroot, 0, &fakeRand{}, cs.Blockstore(), mkFakedSigSyscalls(cs.VMSys()))
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("failed to create NewVM: %w", err)
|
||||
}
|
||||
@ -333,7 +333,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot ci
|
||||
return st, nil
|
||||
}
|
||||
|
||||
func MakeGenesisBlock(ctx context.Context, bs bstore.Blockstore, sys runtime.Syscalls, template genesis.Template) (*GenesisBootstrap, error) {
|
||||
func MakeGenesisBlock(ctx context.Context, bs bstore.Blockstore, sys vm.SyscallBuilder, template genesis.Template) (*GenesisBootstrap, error) {
|
||||
st, err := MakeInitialStateTree(ctx, bs, template)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("make initial state tree failed: %w", err)
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/filecoin-project/lotus/chain/state"
|
||||
"math/rand"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
@ -46,8 +47,16 @@ func (fss *fakedSigSyscalls) VerifySignature(signature crypto.Signature, signer
|
||||
return nil
|
||||
}
|
||||
|
||||
func mkFakedSigSyscalls(base vm.SyscallBuilder) vm.SyscallBuilder {
|
||||
return func(ctx context.Context, cstate *state.StateTree, cst cbor.IpldStore) runtime.Syscalls {
|
||||
return &fakedSigSyscalls{
|
||||
base(ctx, cstate, cst),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid, miners []genesis.Miner) (cid.Cid, error) {
|
||||
vm, err := vm.NewVM(sroot, 0, &fakeRand{}, cs.Blockstore(), &fakedSigSyscalls{cs.VMSys()})
|
||||
vm, err := vm.NewVM(sroot, 0, &fakeRand{}, cs.Blockstore(), mkFakedSigSyscalls(cs.VMSys()))
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("failed to create NewVM: %w", err)
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ func TestForkHeightTriggers(t *testing.T) {
|
||||
}
|
||||
|
||||
inv.Register(builtin.PaymentChannelActorCodeID, &testActor{}, &testActorState{})
|
||||
sm.SetVMConstructor(func(c cid.Cid, h abi.ChainEpoch, r vm.Rand, b blockstore.Blockstore, s runtime.Syscalls) (*vm.VM, error) {
|
||||
sm.SetVMConstructor(func(c cid.Cid, h abi.ChainEpoch, r vm.Rand, b blockstore.Blockstore, s vm.SyscallBuilder) (*vm.VM, error) {
|
||||
nvm, err := vm.NewVM(c, h, r, b, s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"golang.org/x/xerrors"
|
||||
@ -41,7 +40,7 @@ type StateManager struct {
|
||||
stCache map[string][]cid.Cid
|
||||
compWait map[string]chan struct{}
|
||||
stlk sync.Mutex
|
||||
newVM func(cid.Cid, abi.ChainEpoch, vm.Rand, blockstore.Blockstore, runtime.Syscalls) (*vm.VM, error)
|
||||
newVM func(cid.Cid, abi.ChainEpoch, vm.Rand, blockstore.Blockstore, vm.SyscallBuilder) (*vm.VM, error)
|
||||
}
|
||||
|
||||
func NewStateManager(cs *store.ChainStore) *StateManager {
|
||||
@ -754,6 +753,6 @@ func (sm *StateManager) ValidateChain(ctx context.Context, ts *types.TipSet) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sm *StateManager) SetVMConstructor(nvm func(cid.Cid, abi.ChainEpoch, vm.Rand, blockstore.Blockstore, runtime.Syscalls) (*vm.VM, error)) {
|
||||
func (sm *StateManager) SetVMConstructor(nvm func(cid.Cid, abi.ChainEpoch, vm.Rand, blockstore.Blockstore, vm.SyscallBuilder) (*vm.VM, error)) {
|
||||
sm.newVM = nvm
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
@ -85,10 +84,10 @@ type ChainStore struct {
|
||||
mmCache *lru.ARCCache
|
||||
tsCache *lru.ARCCache
|
||||
|
||||
vmcalls runtime.Syscalls
|
||||
vmcalls vm.SyscallBuilder
|
||||
}
|
||||
|
||||
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Syscalls) *ChainStore {
|
||||
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore {
|
||||
c, _ := lru.NewARC(2048)
|
||||
tsc, _ := lru.NewARC(4096)
|
||||
cs := &ChainStore{
|
||||
@ -903,7 +902,7 @@ func (cs *ChainStore) Store(ctx context.Context) adt.Store {
|
||||
return ActorStore(ctx, cs.bs)
|
||||
}
|
||||
|
||||
func (cs *ChainStore) VMSys() runtime.Syscalls {
|
||||
func (cs *ChainStore) VMSys() vm.SyscallBuilder {
|
||||
return cs.vmcalls
|
||||
}
|
||||
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/crypto"
|
||||
"github.com/filecoin-project/specs-actors/actors/puppet"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
vtypes "github.com/filecoin-project/chain-validation/chain/types"
|
||||
@ -25,12 +24,12 @@ import (
|
||||
// Applier applies messages to state trees and storage.
|
||||
type Applier struct {
|
||||
stateWrapper *StateWrapper
|
||||
syscalls runtime.Syscalls
|
||||
syscalls vm.SyscallBuilder
|
||||
}
|
||||
|
||||
var _ vstate.Applier = &Applier{}
|
||||
|
||||
func NewApplier(sw *StateWrapper, syscalls runtime.Syscalls) *Applier {
|
||||
func NewApplier(sw *StateWrapper, syscalls vm.SyscallBuilder) *Applier {
|
||||
return &Applier{sw, syscalls}
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,10 @@
|
||||
package validation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/filecoin-project/lotus/chain/state"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
|
||||
vstate "github.com/filecoin-project/chain-validation/state"
|
||||
)
|
||||
@ -18,7 +21,9 @@ func NewFactories() *Factories {
|
||||
|
||||
func (f *Factories) NewStateAndApplier(syscalls runtime.Syscalls) (vstate.VMWrapper, vstate.Applier) {
|
||||
st := NewState()
|
||||
return st, NewApplier(st, syscalls)
|
||||
return st, NewApplier(st, func(ctx context.Context, cstate *state.StateTree, cst cbor.IpldStore) runtime.Syscalls {
|
||||
return syscalls
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Factories) NewKeyManager() vstate.KeyManager {
|
||||
|
@ -32,15 +32,26 @@ func init() {
|
||||
|
||||
// Actual type is defined in chain/types/vmcontext.go because the VMContext interface is there
|
||||
|
||||
func Syscalls(verifier ffiwrapper.Verifier) runtime.Syscalls {
|
||||
return &syscallShim{verifier: verifier}
|
||||
type SyscallBuilder func(ctx context.Context, cstate *state.StateTree, cst cbor.IpldStore) runtime.Syscalls
|
||||
|
||||
func Syscalls(verifier ffiwrapper.Verifier) SyscallBuilder {
|
||||
return func(ctx context.Context, cstate *state.StateTree, cst cbor.IpldStore) runtime.Syscalls {
|
||||
return &syscallShim{
|
||||
ctx: ctx,
|
||||
|
||||
cstate: cstate,
|
||||
cst: cst,
|
||||
|
||||
verifier: verifier,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type syscallShim struct {
|
||||
ctx context.Context
|
||||
|
||||
cstate *state.StateTree
|
||||
cst *cbor.BasicIpldStore
|
||||
cst cbor.IpldStore
|
||||
verifier ffiwrapper.Verifier
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/account"
|
||||
init_ "github.com/filecoin-project/specs-actors/actors/builtin/init"
|
||||
"github.com/filecoin-project/specs-actors/actors/crypto"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
@ -115,7 +114,7 @@ func (vm *VM) makeRuntime(ctx context.Context, msg *types.Message, origin addres
|
||||
Atlas: vm.cst.Atlas,
|
||||
}
|
||||
rt.sys = pricedSyscalls{
|
||||
under: vm.Syscalls,
|
||||
under: vm.Syscalls(ctx, vm.cstate, rt.cst),
|
||||
chargeGas: rt.chargeGasFunc(1),
|
||||
pl: rt.pricelist,
|
||||
}
|
||||
@ -148,10 +147,10 @@ type VM struct {
|
||||
inv *Invoker
|
||||
rand Rand
|
||||
|
||||
Syscalls runtime.Syscalls
|
||||
Syscalls SyscallBuilder
|
||||
}
|
||||
|
||||
func NewVM(base cid.Cid, height abi.ChainEpoch, r Rand, cbs blockstore.Blockstore, syscalls runtime.Syscalls) (*VM, error) {
|
||||
func NewVM(base cid.Cid, height abi.ChainEpoch, r Rand, cbs blockstore.Blockstore, syscalls SyscallBuilder) (*VM, error) {
|
||||
buf := bufbstore.NewBufferedBstore(cbs)
|
||||
cst := cbor.NewCborStore(buf)
|
||||
state, err := state.LoadStateTree(cst, base)
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
)
|
||||
|
||||
func NewStore(ctx context.Context, cst *cbor.BasicIpldStore) adt.Store {
|
||||
func NewStore(ctx context.Context, cst cbor.IpldStore) adt.Store {
|
||||
return &store{
|
||||
cst: cst,
|
||||
ctx: ctx,
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
storage2 "github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
@ -57,6 +56,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/testing"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/paychmgr"
|
||||
"github.com/filecoin-project/lotus/paychmgr/settler"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
sectorstorage "github.com/filecoin-project/sector-storage"
|
||||
@ -118,6 +118,7 @@ const (
|
||||
// daemon
|
||||
ExtractApiKey
|
||||
HeadMetricsKey
|
||||
SettlePaymentChannelsKey
|
||||
RunPeerTaggerKey
|
||||
JournalKey
|
||||
|
||||
@ -225,7 +226,7 @@ func Online() Option {
|
||||
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),
|
||||
|
||||
Override(new(ffiwrapper.Verifier), ffiwrapper.ProofVerifier),
|
||||
Override(new(runtime.Syscalls), vm.Syscalls),
|
||||
Override(new(vm.SyscallBuilder), vm.Syscalls),
|
||||
Override(new(*store.ChainStore), modules.ChainStore),
|
||||
Override(new(*stmgr.StateManager), stmgr.NewStateManager),
|
||||
Override(new(*wallet.Wallet), wallet.NewWallet),
|
||||
@ -272,6 +273,7 @@ func Online() Option {
|
||||
Override(new(*paychmgr.Store), paychmgr.NewStore),
|
||||
Override(new(*paychmgr.Manager), paychmgr.NewManager),
|
||||
Override(new(*market.FundMgr), market.NewFundMgr),
|
||||
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
|
||||
),
|
||||
|
||||
// miner
|
||||
|
@ -107,36 +107,43 @@ func (a *PaychAPI) PaychStatus(ctx context.Context, pch address.Address) (*api.P
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *PaychAPI) PaychClose(ctx context.Context, addr address.Address) (cid.Cid, error) {
|
||||
panic("TODO Settle logic")
|
||||
func (a *PaychAPI) PaychSettle(ctx context.Context, addr address.Address) (cid.Cid, error) {
|
||||
|
||||
ci, err := a.PaychMgr.GetChannelInfo(addr)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
nonce, err := a.MpoolGetNonce(ctx, ci.Control)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
To: addr,
|
||||
From: ci.Control,
|
||||
Value: types.NewInt(0),
|
||||
Method: builtin.MethodsPaych.Settle,
|
||||
Nonce: nonce,
|
||||
|
||||
GasLimit: 0,
|
||||
GasPrice: types.NewInt(0),
|
||||
}
|
||||
smgs, err := a.MpoolPushMessage(ctx, msg)
|
||||
|
||||
smsg, err := a.WalletSignMessage(ctx, ci.Control, msg)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
return smgs.Cid(), nil
|
||||
}
|
||||
|
||||
func (a *PaychAPI) PaychCollect(ctx context.Context, addr address.Address) (cid.Cid, error) {
|
||||
|
||||
ci, err := a.PaychMgr.GetChannelInfo(addr)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
if _, err := a.MpoolPush(ctx, smsg); err != nil {
|
||||
msg := &types.Message{
|
||||
To: addr,
|
||||
From: ci.Control,
|
||||
Value: types.NewInt(0),
|
||||
Method: builtin.MethodsPaych.Collect,
|
||||
}
|
||||
|
||||
smsg, err := a.MpoolPushMessage(ctx, msg)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
@ -219,11 +226,6 @@ func (a *PaychAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, s
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
nonce, err := a.MpoolGetNonce(ctx, ci.Control)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
if sv.Extra != nil || len(sv.SecretPreimage) > 0 {
|
||||
return cid.Undef, fmt.Errorf("cant handle more advanced payment channel stuff yet")
|
||||
}
|
||||
@ -236,25 +238,17 @@ func (a *PaychAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, s
|
||||
}
|
||||
|
||||
msg := &types.Message{
|
||||
From: ci.Control,
|
||||
To: ch,
|
||||
Value: types.NewInt(0),
|
||||
Nonce: nonce,
|
||||
Method: builtin.MethodsPaych.UpdateChannelState,
|
||||
Params: enc,
|
||||
GasLimit: 0,
|
||||
GasPrice: types.NewInt(0),
|
||||
From: ci.Control,
|
||||
To: ch,
|
||||
Value: types.NewInt(0),
|
||||
Method: builtin.MethodsPaych.UpdateChannelState,
|
||||
Params: enc,
|
||||
}
|
||||
|
||||
smsg, err := a.WalletSignMessage(ctx, ci.Control, msg)
|
||||
smsg, err := a.MpoolPushMessage(ctx, msg)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
if _, err := a.MpoolPush(ctx, smsg); err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
// TODO: should we wait for it...?
|
||||
return smsg.Cid(), nil
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package modules
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"github.com/filecoin-project/lotus/chain/vm"
|
||||
|
||||
"github.com/ipfs/go-bitswap"
|
||||
"github.com/ipfs/go-bitswap/network"
|
||||
@ -17,7 +18,6 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/chain/beacon"
|
||||
@ -83,7 +83,7 @@ func ChainBlockservice(bs dtypes.ChainBlockstore, rem dtypes.ChainExchange) dtyp
|
||||
return blockservice.New(bs, rem)
|
||||
}
|
||||
|
||||
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls runtime.Syscalls) *store.ChainStore {
|
||||
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder) *store.ChainStore {
|
||||
chain := store.NewChainStore(bs, ds, syscalls)
|
||||
|
||||
if err := chain.Load(); err != nil {
|
||||
|
@ -17,12 +17,11 @@ import (
|
||||
"github.com/mitchellh/go-homedir"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/gen"
|
||||
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/vm"
|
||||
"github.com/filecoin-project/lotus/genesis"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
@ -30,8 +29,8 @@ import (
|
||||
|
||||
var glog = logging.Logger("genesis")
|
||||
|
||||
func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.ChainBlockstore, syscalls runtime.Syscalls) modules.Genesis {
|
||||
return func(bs dtypes.ChainBlockstore, syscalls runtime.Syscalls) modules.Genesis {
|
||||
func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
|
||||
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
|
||||
return func() (*types.BlockHeader, error) {
|
||||
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
|
||||
b, err := genesis2.MakeGenesisBlock(context.TODO(), bs, syscalls, template)
|
||||
@ -51,8 +50,8 @@ func MakeGenesisMem(out io.Writer, template genesis.Template) func(bs dtypes.Cha
|
||||
}
|
||||
}
|
||||
|
||||
func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore, syscalls runtime.Syscalls) modules.Genesis {
|
||||
return func(bs dtypes.ChainBlockstore, syscalls runtime.Syscalls) modules.Genesis {
|
||||
func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
|
||||
return func(bs dtypes.ChainBlockstore, syscalls vm.SyscallBuilder) modules.Genesis {
|
||||
return func() (*types.BlockHeader, error) {
|
||||
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
|
||||
genesisTemplate, err := homedir.Expand(genesisTemplate)
|
||||
|
@ -546,3 +546,14 @@ func TestCCUpgrade(t *testing.T) {
|
||||
|
||||
test.TestCCUpgrade(t, mockSbBuilder, 5*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestPaymentChannels(t *testing.T) {
|
||||
logging.SetLogLevel("miner", "ERROR")
|
||||
logging.SetLogLevel("chainstore", "ERROR")
|
||||
logging.SetLogLevel("chain", "ERROR")
|
||||
logging.SetLogLevel("sub", "ERROR")
|
||||
logging.SetLogLevel("pubsub", "ERROR")
|
||||
logging.SetLogLevel("storageminer", "ERROR")
|
||||
|
||||
test.TestPaymentChannels(t, mockSbBuilder, 5*time.Millisecond)
|
||||
}
|
||||
|
@ -185,7 +185,7 @@ func (pm *Manager) checkVoucherValid(ctx context.Context, ch address.Address, sv
|
||||
|
||||
// CheckVoucherSpendable checks if the given voucher is currently spendable
|
||||
func (pm *Manager) CheckVoucherSpendable(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, secret []byte, proof []byte) (bool, error) {
|
||||
owner, err := pm.getPaychOwner(ctx, ch)
|
||||
recipient, err := pm.getPaychRecipient(ctx, ch)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -222,7 +222,7 @@ func (pm *Manager) CheckVoucherSpendable(ctx context.Context, ch address.Address
|
||||
}
|
||||
|
||||
ret, err := pm.sm.Call(ctx, &types.Message{
|
||||
From: owner,
|
||||
From: recipient,
|
||||
To: ch,
|
||||
Method: builtin.MethodsPaych.UpdateChannelState,
|
||||
Params: enc,
|
||||
@ -238,13 +238,13 @@ func (pm *Manager) CheckVoucherSpendable(ctx context.Context, ch address.Address
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (pm *Manager) getPaychOwner(ctx context.Context, ch address.Address) (address.Address, error) {
|
||||
func (pm *Manager) getPaychRecipient(ctx context.Context, ch address.Address) (address.Address, error) {
|
||||
var state paych.State
|
||||
if _, err := pm.sm.LoadActorState(ctx, ch, &state, nil); err != nil {
|
||||
return address.Address{}, err
|
||||
}
|
||||
|
||||
return state.From, nil
|
||||
return state.To, nil
|
||||
}
|
||||
|
||||
func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) {
|
||||
|
141
paychmgr/settler/settler.go
Normal file
141
paychmgr/settler/settler.go
Normal file
@ -0,0 +1,141 @@
|
||||
package settler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/events"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
||||
)
|
||||
|
||||
var log = logging.Logger("payment-channel-settler")
|
||||
|
||||
// API are the dependencies need to run the payment channel settler
|
||||
type API struct {
|
||||
fx.In
|
||||
|
||||
full.ChainAPI
|
||||
full.StateAPI
|
||||
payapi.PaychAPI
|
||||
}
|
||||
|
||||
type settlerAPI interface {
|
||||
PaychList(context.Context) ([]address.Address, error)
|
||||
PaychStatus(context.Context, address.Address) (*api.PaychStatus, error)
|
||||
PaychVoucherCheckSpendable(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (bool, error)
|
||||
PaychVoucherList(context.Context, address.Address) ([]*paych.SignedVoucher, error)
|
||||
PaychVoucherSubmit(context.Context, address.Address, *paych.SignedVoucher) (cid.Cid, error)
|
||||
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error)
|
||||
}
|
||||
|
||||
type paymentChannelSettler struct {
|
||||
ctx context.Context
|
||||
api settlerAPI
|
||||
}
|
||||
|
||||
// SettlePaymentChannels checks the chain for events related to payment channels settling and
|
||||
// submits any vouchers for inbound channels tracked for this node
|
||||
func SettlePaymentChannels(lc fx.Lifecycle, api API) error {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
pcs := newPaymentChannelSettler(ctx, &api)
|
||||
ev := events.NewEvents(ctx, &api)
|
||||
return ev.Called(pcs.check, pcs.messageHandler, pcs.revertHandler, int(build.MessageConfidence+1), events.NoTimeout, pcs.matcher)
|
||||
},
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func newPaymentChannelSettler(ctx context.Context, api settlerAPI) *paymentChannelSettler {
|
||||
return &paymentChannelSettler{
|
||||
ctx: ctx,
|
||||
api: api,
|
||||
}
|
||||
}
|
||||
|
||||
func (pcs *paymentChannelSettler) check(ts *types.TipSet) (done bool, more bool, err error) {
|
||||
return false, true, nil
|
||||
}
|
||||
|
||||
func (pcs *paymentChannelSettler) messageHandler(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) {
|
||||
vouchers, err := pcs.api.PaychVoucherList(pcs.ctx, msg.To)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
|
||||
bestByLane := make(map[uint64]*paych.SignedVoucher)
|
||||
for _, voucher := range vouchers {
|
||||
spendable, err := pcs.api.PaychVoucherCheckSpendable(pcs.ctx, msg.To, voucher, nil, nil)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if spendable {
|
||||
if bestByLane[voucher.Lane] == nil || voucher.Amount.GreaterThan(bestByLane[voucher.Lane].Amount) {
|
||||
bestByLane[voucher.Lane] = voucher
|
||||
}
|
||||
}
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(bestByLane))
|
||||
for _, voucher := range bestByLane {
|
||||
submitMessageCID, err := pcs.api.PaychVoucherSubmit(pcs.ctx, msg.To, voucher)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
go func(voucher *paych.SignedVoucher, submitMessageCID cid.Cid) {
|
||||
defer wg.Done()
|
||||
msgLookup, err := pcs.api.StateWaitMsg(pcs.ctx, submitMessageCID, build.MessageConfidence)
|
||||
if err != nil {
|
||||
log.Errorf("submitting voucher: %s", err.Error())
|
||||
}
|
||||
if msgLookup.Receipt.ExitCode != 0 {
|
||||
log.Errorf("failed submitting voucher: %+v", voucher)
|
||||
}
|
||||
}(voucher, submitMessageCID)
|
||||
}
|
||||
wg.Wait()
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (pcs *paymentChannelSettler) revertHandler(ctx context.Context, ts *types.TipSet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pcs *paymentChannelSettler) matcher(msg *types.Message) (matchOnce bool, matched bool, err error) {
|
||||
// Check if this is a settle payment channel message
|
||||
if msg.Method != builtin.MethodsPaych.Settle {
|
||||
return false, false, nil
|
||||
}
|
||||
// Check if this payment channel is of concern to this node (i.e. tracked in payment channel store),
|
||||
// and its inbound (i.e. we're getting vouchers that we may need to redeem)
|
||||
trackedAddresses, err := pcs.api.PaychList(pcs.ctx)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
for _, addr := range trackedAddresses {
|
||||
if msg.To == addr {
|
||||
status, err := pcs.api.PaychStatus(pcs.ctx, addr)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
if status.Direction == api.PCHInbound {
|
||||
return false, true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, false, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user