refactor: paych - use channel accessor from/to instead of passing them around as params
This commit is contained in:
parent
1d3a21f6c8
commit
8423325a6e
@ -138,7 +138,7 @@ func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt t
|
|||||||
return address.Undef, cid.Undef, err
|
return address.Undef, cid.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return chanAccessor.getPaych(ctx, from, to, amt)
|
return chanAccessor.getPaych(ctx, amt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *Manager) AvailableFunds(from address.Address, to address.Address) (*api.ChannelAvailableFunds, error) {
|
func (pm *Manager) AvailableFunds(from address.Address, to address.Address) (*api.ChannelAvailableFunds, error) {
|
||||||
|
@ -36,8 +36,6 @@ type paychFundsRes struct {
|
|||||||
type fundsReq struct {
|
type fundsReq struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
promise chan *paychFundsRes
|
promise chan *paychFundsRes
|
||||||
from address.Address
|
|
||||||
to address.Address
|
|
||||||
amt types.BigInt
|
amt types.BigInt
|
||||||
|
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
@ -47,13 +45,11 @@ type fundsReq struct {
|
|||||||
active bool
|
active bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFundsReq(ctx context.Context, from address.Address, to address.Address, amt types.BigInt) *fundsReq {
|
func newFundsReq(ctx context.Context, amt types.BigInt) *fundsReq {
|
||||||
promise := make(chan *paychFundsRes)
|
promise := make(chan *paychFundsRes)
|
||||||
return &fundsReq{
|
return &fundsReq{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
promise: promise,
|
promise: promise,
|
||||||
from: from,
|
|
||||||
to: to,
|
|
||||||
amt: amt,
|
amt: amt,
|
||||||
active: true,
|
active: true,
|
||||||
}
|
}
|
||||||
@ -150,14 +146,6 @@ func (m *mergedFundsReq) onComplete(res *paychFundsRes) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mergedFundsReq) from() address.Address {
|
|
||||||
return m.reqs[0].from
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mergedFundsReq) to() address.Address {
|
|
||||||
return m.reqs[0].to
|
|
||||||
}
|
|
||||||
|
|
||||||
// sum is the sum of the amounts in all requests in the merge
|
// sum is the sum of the amounts in all requests in the merge
|
||||||
func (m *mergedFundsReq) sum() types.BigInt {
|
func (m *mergedFundsReq) sum() types.BigInt {
|
||||||
sum := types.NewInt(0)
|
sum := types.NewInt(0)
|
||||||
@ -180,9 +168,9 @@ func (m *mergedFundsReq) sum() types.BigInt {
|
|||||||
// address and the CID of the new add funds message.
|
// address and the CID of the new add funds message.
|
||||||
// If an operation returns an error, subsequent waiting operations will still
|
// If an operation returns an error, subsequent waiting operations will still
|
||||||
// be attempted.
|
// be attempted.
|
||||||
func (ca *channelAccessor) getPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) {
|
func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt) (address.Address, cid.Cid, error) {
|
||||||
// Add the request to add funds to a queue and wait for the result
|
// Add the request to add funds to a queue and wait for the result
|
||||||
freq := newFundsReq(ctx, from, to, amt)
|
freq := newFundsReq(ctx, amt)
|
||||||
ca.enqueue(freq)
|
ca.enqueue(freq)
|
||||||
select {
|
select {
|
||||||
case res := <-freq.promise:
|
case res := <-freq.promise:
|
||||||
@ -226,7 +214,7 @@ func (ca *channelAccessor) processQueue() (*api.ChannelAvailableFunds, error) {
|
|||||||
return ca.currentAvailableFunds(amt)
|
return ca.currentAvailableFunds(amt)
|
||||||
}
|
}
|
||||||
|
|
||||||
res := ca.processTask(merged.ctx, merged.from(), merged.to(), amt)
|
res := ca.processTask(merged.ctx, amt)
|
||||||
|
|
||||||
// If the task is waiting on an external event (eg something to appear on
|
// If the task is waiting on an external event (eg something to appear on
|
||||||
// chain) it will return nil
|
// chain) it will return nil
|
||||||
@ -360,23 +348,18 @@ func (ca *channelAccessor) currentAvailableFunds(queuedAmt types.BigInt) (*api.C
|
|||||||
// Note that processTask may be called repeatedly in the same state, and should
|
// Note that processTask may be called repeatedly in the same state, and should
|
||||||
// return nil if there is no state change to be made (eg when waiting for a
|
// return nil if there is no state change to be made (eg when waiting for a
|
||||||
// message to be confirmed on chain)
|
// message to be confirmed on chain)
|
||||||
func (ca *channelAccessor) processTask(
|
func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *paychFundsRes {
|
||||||
ctx context.Context,
|
|
||||||
from address.Address,
|
|
||||||
to address.Address,
|
|
||||||
amt types.BigInt,
|
|
||||||
) *paychFundsRes {
|
|
||||||
// Get the payment channel for the from/to addresses.
|
// Get the payment channel for the from/to addresses.
|
||||||
// Note: It's ok if we get ErrChannelNotTracked. It just means we need to
|
// Note: It's ok if we get ErrChannelNotTracked. It just means we need to
|
||||||
// create a channel.
|
// create a channel.
|
||||||
channelInfo, err := ca.store.OutboundActiveByFromTo(from, to)
|
channelInfo, err := ca.store.OutboundActiveByFromTo(ca.from, ca.to)
|
||||||
if err != nil && err != ErrChannelNotTracked {
|
if err != nil && err != ErrChannelNotTracked {
|
||||||
return &paychFundsRes{err: err}
|
return &paychFundsRes{err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If a channel has not yet been created, create one.
|
// If a channel has not yet been created, create one.
|
||||||
if channelInfo == nil {
|
if channelInfo == nil {
|
||||||
mcid, err := ca.createPaych(ctx, from, to, amt)
|
mcid, err := ca.createPaych(ctx, amt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &paychFundsRes{err: err}
|
return &paychFundsRes{err: err}
|
||||||
}
|
}
|
||||||
@ -408,8 +391,8 @@ func (ca *channelAccessor) processTask(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// createPaych sends a message to create the channel and returns the message cid
|
// createPaych sends a message to create the channel and returns the message cid
|
||||||
func (ca *channelAccessor) createPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (cid.Cid, error) {
|
func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (cid.Cid, error) {
|
||||||
params, aerr := actors.SerializeParams(&paych.ConstructorParams{From: from, To: to})
|
params, aerr := actors.SerializeParams(&paych.ConstructorParams{From: ca.from, To: ca.to})
|
||||||
if aerr != nil {
|
if aerr != nil {
|
||||||
return cid.Undef, aerr
|
return cid.Undef, aerr
|
||||||
}
|
}
|
||||||
@ -424,7 +407,7 @@ func (ca *channelAccessor) createPaych(ctx context.Context, from, to address.Add
|
|||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
To: builtin.InitActorAddr,
|
To: builtin.InitActorAddr,
|
||||||
From: from,
|
From: ca.from,
|
||||||
Value: amt,
|
Value: amt,
|
||||||
Method: builtin.MethodsInit.Exec,
|
Method: builtin.MethodsInit.Exec,
|
||||||
Params: enc,
|
Params: enc,
|
||||||
@ -437,7 +420,7 @@ func (ca *channelAccessor) createPaych(ctx context.Context, from, to address.Add
|
|||||||
mcid := smsg.Cid()
|
mcid := smsg.Cid()
|
||||||
|
|
||||||
// Create a new channel in the store
|
// Create a new channel in the store
|
||||||
ci, err := ca.store.CreateChannel(from, to, mcid, amt)
|
ci, err := ca.store.CreateChannel(ca.from, ca.to, mcid, amt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("creating channel: %s", err)
|
log.Errorf("creating channel: %s", err)
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
|
Loading…
Reference in New Issue
Block a user