separate lock/unlock and do independent waiting per PR comments
This commit is contained in:
parent
a9c98eab8d
commit
28c5697578
@ -1,14 +1,18 @@
|
||||
package retrievaladapter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-markets/shared"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
initactor "github.com/filecoin-project/specs-actors/actors/builtin/init"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
||||
@ -65,13 +69,32 @@ func (rcn *retrievalClientNode) GetChainHead(ctx context.Context) (shared.TipSet
|
||||
return head.Key().Bytes(), head.Height(), nil
|
||||
}
|
||||
|
||||
// WaitForPaymentChannelAddFunds waits for messageCID to appear on chain.
|
||||
// WaitForPaymentChannelAddFunds waits messageCID to appear on chain. If it doesn't appear within
|
||||
// defaultMsgWaitTimeout it returns error
|
||||
func (rcn *retrievalClientNode) WaitForPaymentChannelAddFunds(messageCID cid.Cid) error {
|
||||
return rcn.pmgr.WaitForAddFundsMsg(context.TODO(), messageCID)
|
||||
_, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if mr.ExitCode != exitcode.Ok {
|
||||
return xerrors.Errorf("wait for payment channel to add funds failed. exit code: %d", mr.ExitCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForPaymentChannelCreation waits for messageCID to appear on chain and returns
|
||||
// the address of the payment channel
|
||||
func (rcn *retrievalClientNode) WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error) {
|
||||
return rcn.pmgr.WaitForPaychCreateMsg(context.TODO(), messageCID)
|
||||
_, mr, err := rcn.chainapi.StateManager.WaitForMessage(context.TODO(), messageCID)
|
||||
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
if mr.ExitCode != exitcode.Ok {
|
||||
return address.Undef, xerrors.Errorf("payment channel creation failed. exit code: %d", mr.ExitCode)
|
||||
}
|
||||
var retval initactor.ExecReturn
|
||||
if err := retval.UnmarshalCBOR(bytes.NewReader(mr.Return)); err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
return retval.RobustAddress, nil
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package paychmgr
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
init_ "github.com/filecoin-project/specs-actors/actors/builtin/init"
|
||||
@ -45,39 +44,41 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("initializing paych actor: %w", err)
|
||||
}
|
||||
|
||||
return smsg.Cid(), nil
|
||||
mcid := smsg.Cid()
|
||||
go pm.waitForPaychCreateMsg(ctx, mcid)
|
||||
return mcid, nil
|
||||
}
|
||||
|
||||
// WaitForPaychCreateMsg waits for mcid to appear on chain and returns the robust address of the
|
||||
// created payment channel
|
||||
func (pm *Manager) WaitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) (address.Address, error) {
|
||||
// TODO: wait outside the store lock!
|
||||
// (tricky because we need to setup channel tracking before we know its address)
|
||||
func (pm *Manager) waitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) {
|
||||
defer pm.store.lk.Unlock()
|
||||
mwait, err := pm.state.StateWaitMsg(ctx, mcid)
|
||||
if err != nil {
|
||||
return address.Undef, xerrors.Errorf("wait msg: %w", err)
|
||||
log.Errorf("wait msg: %w", err)
|
||||
}
|
||||
|
||||
if mwait.Receipt.ExitCode != 0 {
|
||||
return address.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
|
||||
log.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
|
||||
}
|
||||
|
||||
var decodedReturn init_.ExecReturn
|
||||
err = decodedReturn.UnmarshalCBOR(bytes.NewReader(mwait.Receipt.Return))
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
log.Error(err)
|
||||
}
|
||||
paychaddr := decodedReturn.RobustAddress
|
||||
|
||||
ci, err := pm.loadOutboundChannelInfo(ctx, paychaddr)
|
||||
if err != nil {
|
||||
return address.Undef, xerrors.Errorf("loading channel info: %w", err)
|
||||
log.Errorf("loading channel info: %w", err)
|
||||
}
|
||||
|
||||
if err := pm.store.trackChannel(ci); err != nil {
|
||||
return address.Undef, xerrors.Errorf("tracking channel: %w", err)
|
||||
log.Errorf("tracking channel: %w", err)
|
||||
}
|
||||
|
||||
return paychaddr, nil
|
||||
}
|
||||
|
||||
func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) (cid.Cid, error) {
|
||||
@ -94,28 +95,28 @@ func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from addres
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
return smsg.Cid(), nil
|
||||
mcid := smsg.Cid()
|
||||
go pm.waitForAddFundsMsg(ctx, mcid)
|
||||
return mcid, nil
|
||||
}
|
||||
|
||||
// WaitForAddFundsMsg waits for mcid to appear on chain and returns error, if any
|
||||
func (pm *Manager) WaitForAddFundsMsg(ctx context.Context, mcid cid.Cid) error {
|
||||
|
||||
mwait, err := pm.state.StateWaitMsg(ctx, mcid) // TODO: wait outside the store lock!
|
||||
// TODO: wait outside the store lock!
|
||||
// (tricky because we need to setup channel tracking before we know it's address)
|
||||
func (pm *Manager) waitForAddFundsMsg(ctx context.Context, mcid cid.Cid) {
|
||||
defer pm.store.lk.Unlock()
|
||||
mwait, err := pm.state.StateWaitMsg(ctx, mcid)
|
||||
if err != nil {
|
||||
return err
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
if mwait.Receipt.ExitCode != 0 {
|
||||
return fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
|
||||
log.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, ensureFree types.BigInt) (address.Address, cid.Cid, error) {
|
||||
pm.store.lk.Lock()
|
||||
defer pm.store.lk.Unlock()
|
||||
|
||||
var mcid cid.Cid
|
||||
ch, err := pm.store.findChan(func(ci *ChannelInfo) bool {
|
||||
if ci.Direction != DirOutbound {
|
||||
|
Loading…
Reference in New Issue
Block a user