begin implementing funcs to support latest go-fil-markest retrieval market API

This commit is contained in:
shannonwells 2020-04-16 17:25:06 -07:00
parent f960cccc51
commit fd7f4ef080
3 changed files with 37 additions and 23 deletions

2
go.mod
View File

@ -19,7 +19,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6 github.com/filecoin-project/go-data-transfer v0.0.0-20200408061858-82c58b423ca6
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
github.com/filecoin-project/go-fil-markets v0.0.0-20200413201123-731e6ca89984 github.com/filecoin-project/go-fil-markets v0.0.0-20200415011556-4378bd41b91f
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663
github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/go-statestore v0.1.0

View File

@ -8,6 +8,7 @@ import (
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/full"
payapi "github.com/filecoin-project/lotus/node/impl/paych" payapi "github.com/filecoin-project/lotus/node/impl/paych"
@ -29,11 +30,10 @@ func NewRetrievalClientNode(pmgr *paychmgr.Manager, payapi payapi.PaychAPI, chai
// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist // GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
// between a client and a miner and ensures the client has the given amount of // between a client and a miner and ensures the client has the given amount of
// funds available in the channel. // funds available in the channel.
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, error) { func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) {
// TODO: respect the provided TipSetToken (a serialized TipSetKey) when // TODO: respect the provided TipSetToken (a serialized TipSetKey) when
// querying the chain // querying the chain
paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable) return rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable)
return paych, err
} }
// Allocate late creates a lane within a payment channel so that calls to // Allocate late creates a lane within a payment channel so that calls to
@ -64,3 +64,12 @@ func (rcn *retrievalClientNode) GetChainHead(ctx context.Context) (shared.TipSet
return head.Key().Bytes(), head.Height(), nil return head.Key().Bytes(), head.Height(), nil
} }
func (rcn *retrievalClientNode) WaitForPaymentChannelAddFunds(messageCID cid.Cid) error {
panic("implement me")
}
func (rcn *retrievalClientNode) WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) {
panic("implement me")
}

View File

@ -17,10 +17,10 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) { func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (cid.Cid, error) {
params, aerr := actors.SerializeParams(&paych.ConstructorParams{From: from, To: to}) params, aerr := actors.SerializeParams(&paych.ConstructorParams{From: from, To: to})
if aerr != nil { if aerr != nil {
return address.Undef, cid.Undef, aerr return cid.Undef, aerr
} }
enc, aerr := actors.SerializeParams(&init_.ExecParams{ enc, aerr := actors.SerializeParams(&init_.ExecParams{
@ -28,7 +28,7 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am
ConstructorParams: params, ConstructorParams: params,
}) })
if aerr != nil { if aerr != nil {
return address.Undef, cid.Undef, aerr return cid.Undef, aerr
} }
msg := &types.Message{ msg := &types.Message{
@ -43,42 +43,40 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am
smsg, err := pm.mpool.MpoolPushMessage(ctx, msg) smsg, err := pm.mpool.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
return address.Undef, cid.Undef, xerrors.Errorf("initializing paych actor: %w", err) return cid.Undef, xerrors.Errorf("initializing paych actor: %w", err)
} }
mcid := smsg.Cid() return smsg.Cid(), nil
}
// TODO: wait outside the store lock! func (pm *Manager) waitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) (address.Address, error) {
// (tricky because we need to setup channel tracking before we know it's address)
mwait, err := pm.state.StateWaitMsg(ctx, mcid) mwait, err := pm.state.StateWaitMsg(ctx, mcid)
if err != nil { if err != nil {
return address.Undef, cid.Undef, xerrors.Errorf("wait msg: %w", err) return address.Undef, xerrors.Errorf("wait msg: %w", err)
} }
if mwait.Receipt.ExitCode != 0 { if mwait.Receipt.ExitCode != 0 {
return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) return address.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
} }
var decodedReturn init_.ExecReturn var decodedReturn init_.ExecReturn
err = decodedReturn.UnmarshalCBOR(bytes.NewReader(mwait.Receipt.Return)) err = decodedReturn.UnmarshalCBOR(bytes.NewReader(mwait.Receipt.Return))
if err != nil { if err != nil {
return address.Undef, cid.Undef, err return address.Undef, err
} }
paychaddr := decodedReturn.RobustAddress paychaddr := decodedReturn.RobustAddress
ci, err := pm.loadOutboundChannelInfo(ctx, paychaddr) ci, err := pm.loadOutboundChannelInfo(ctx, paychaddr)
if err != nil { if err != nil {
return address.Undef, cid.Undef, xerrors.Errorf("loading channel info: %w", err) return address.Undef, xerrors.Errorf("loading channel info: %w", err)
} }
if err := pm.store.trackChannel(ci); err != nil { if err := pm.store.trackChannel(ci); err != nil {
return address.Undef, cid.Undef, xerrors.Errorf("tracking channel: %w", err) return address.Undef, xerrors.Errorf("tracking channel: %w", err)
}
} }
return paychaddr, mcid, nil func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) (cid.Cid, error) {
}
func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) error {
msg := &types.Message{ msg := &types.Message{
To: ch, To: ch,
From: from, From: from,
@ -90,10 +88,14 @@ func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from addres
smsg, err := pm.mpool.MpoolPushMessage(ctx, msg) smsg, err := pm.mpool.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
return err return cid.Undef, err
}
return smsg.Cid(), nil
} }
mwait, err := pm.state.StateWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock! func (pm *Manager) waitForAddFundsMsg(ctx context.Context, mcid cid.Cid) error {
mwait, err := pm.state.StateWaitMsg(ctx, mcid) // TODO: wait outside the store lock!
if err != nil { if err != nil {
return err return err
} }
@ -109,6 +111,7 @@ func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, ensur
pm.store.lk.Lock() pm.store.lk.Lock()
defer pm.store.lk.Unlock() defer pm.store.lk.Unlock()
var mcid cid.Cid
ch, err := pm.store.findChan(func(ci *ChannelInfo) bool { ch, err := pm.store.findChan(func(ci *ChannelInfo) bool {
if ci.Direction != DirOutbound { if ci.Direction != DirOutbound {
return false return false
@ -120,8 +123,10 @@ func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, ensur
} }
if ch != address.Undef { if ch != address.Undef {
// TODO: Track available funds // TODO: Track available funds
return ch, cid.Undef, pm.addFunds(ctx, ch, from, ensureFree) mcid, err = pm.addFunds(ctx, ch, from, ensureFree)
} else {
mcid, err = pm.createPaych(ctx, from, to, ensureFree)
} }
return pm.createPaych(ctx, from, to, ensureFree) return ch, mcid, err
} }