Merge pull request #2834 from filecoin-project/feat/fund-manager-fix
Improve Fund Manager
This commit is contained in:
commit
c6acfa624e
@ -10,6 +10,7 @@ import (
|
|||||||
typegen "github.com/whyrusleeping/cbor-gen"
|
typegen "github.com/whyrusleeping/cbor-gen"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||||
@ -86,6 +87,50 @@ func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState Di
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type BalanceTables struct {
|
||||||
|
EscrowTable *adt.BalanceTable
|
||||||
|
LockedTable *adt.BalanceTable
|
||||||
|
}
|
||||||
|
|
||||||
|
// DiffBalanceTablesFunc compares two balance tables
|
||||||
|
type DiffBalanceTablesFunc func(ctx context.Context, oldBalanceTable, newBalanceTable BalanceTables) (changed bool, user UserData, err error)
|
||||||
|
|
||||||
|
// OnBalanceChanged runs when the escrow table for available balances changes
|
||||||
|
func (sp *StatePredicates) OnBalanceChanged(diffBalances DiffBalanceTablesFunc) DiffStorageMarketStateFunc {
|
||||||
|
return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) {
|
||||||
|
if oldState.EscrowTable.Equals(newState.EscrowTable) && oldState.LockedTable.Equals(newState.LockedTable) {
|
||||||
|
return false, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ctxStore := &contextStore{
|
||||||
|
ctx: ctx,
|
||||||
|
cst: sp.cst,
|
||||||
|
}
|
||||||
|
|
||||||
|
oldEscrowRoot, err := adt.AsBalanceTable(ctxStore, oldState.EscrowTable)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
oldLockedRoot, err := adt.AsBalanceTable(ctxStore, oldState.LockedTable)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
newEscrowRoot, err := adt.AsBalanceTable(ctxStore, newState.EscrowTable)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
newLockedRoot, err := adt.AsBalanceTable(ctxStore, newState.LockedTable)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return diffBalances(ctx, BalanceTables{oldEscrowRoot, oldLockedRoot}, BalanceTables{newEscrowRoot, newLockedRoot})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type DiffAdtArraysFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot *adt.Array) (changed bool, user UserData, err error)
|
type DiffAdtArraysFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot *adt.Array) (changed bool, user UserData, err error)
|
||||||
|
|
||||||
// OnDealStateChanged calls diffDealStates when the market deal state changes
|
// OnDealStateChanged calls diffDealStates when the market deal state changes
|
||||||
@ -309,6 +354,57 @@ func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffAdtA
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ChangedBalances is a set of changes to deal state
|
||||||
|
type ChangedBalances map[address.Address]BalanceChange
|
||||||
|
|
||||||
|
// BalanceChange is a change in balance from -> to
|
||||||
|
type BalanceChange struct {
|
||||||
|
From abi.TokenAmount
|
||||||
|
To abi.TokenAmount
|
||||||
|
}
|
||||||
|
|
||||||
|
// AvailableBalanceChangedForAddresses detects changes in the escrow table for the given addresses
|
||||||
|
func (sp *StatePredicates) AvailableBalanceChangedForAddresses(getAddrs func() []address.Address) DiffBalanceTablesFunc {
|
||||||
|
return func(ctx context.Context, oldBalances, newBalances BalanceTables) (changed bool, user UserData, err error) {
|
||||||
|
changedBalances := make(ChangedBalances)
|
||||||
|
addrs := getAddrs()
|
||||||
|
for _, addr := range addrs {
|
||||||
|
// If the deal has been removed, we just set it to nil
|
||||||
|
oldEscrowBalance, err := oldBalances.EscrowTable.Get(addr)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
oldLockedBalance, err := oldBalances.LockedTable.Get(addr)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
oldBalance := big.Sub(oldEscrowBalance, oldLockedBalance)
|
||||||
|
|
||||||
|
newEscrowBalance, err := newBalances.EscrowTable.Get(addr)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
newLockedBalance, err := newBalances.LockedTable.Get(addr)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
newBalance := big.Sub(newEscrowBalance, newLockedBalance)
|
||||||
|
|
||||||
|
if !oldBalance.Equals(newBalance) {
|
||||||
|
changedBalances[addr] = BalanceChange{oldBalance, newBalance}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(changedBalances) > 0 {
|
||||||
|
return true, changedBalances, nil
|
||||||
|
}
|
||||||
|
return false, nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type DiffMinerActorStateFunc func(ctx context.Context, oldState *miner.State, newState *miner.State) (changed bool, user UserData, err error)
|
type DiffMinerActorStateFunc func(ctx context.Context, oldState *miner.State, newState *miner.State) (changed bool, user UserData, err error)
|
||||||
|
|
||||||
func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMinerActorState DiffMinerActorStateFunc) DiffTipSetKeyFunc {
|
func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMinerActorState DiffMinerActorStateFunc) DiffTipSetKeyFunc {
|
||||||
|
@ -2,9 +2,10 @@ package state
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/filecoin-project/go-bitfield"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-bitfield"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -112,7 +113,14 @@ func TestMarketPredicates(t *testing.T) {
|
|||||||
abi.DealID(2): oldProp2,
|
abi.DealID(2): oldProp2,
|
||||||
}
|
}
|
||||||
|
|
||||||
oldStateC := createMarketState(ctx, t, store, oldDeals, oldProps)
|
oldBalances := map[address.Address]balance{
|
||||||
|
tutils.NewIDAddr(t, 1): balance{abi.NewTokenAmount(1000), abi.NewTokenAmount(1000)},
|
||||||
|
tutils.NewIDAddr(t, 2): balance{abi.NewTokenAmount(2000), abi.NewTokenAmount(500)},
|
||||||
|
tutils.NewIDAddr(t, 3): balance{abi.NewTokenAmount(3000), abi.NewTokenAmount(2000)},
|
||||||
|
tutils.NewIDAddr(t, 5): balance{abi.NewTokenAmount(3000), abi.NewTokenAmount(1000)},
|
||||||
|
}
|
||||||
|
|
||||||
|
oldStateC := createMarketState(ctx, t, store, oldDeals, oldProps, oldBalances)
|
||||||
|
|
||||||
newDeal1 := &market.DealState{
|
newDeal1 := &market.DealState{
|
||||||
SectorStartEpoch: 1,
|
SectorStartEpoch: 1,
|
||||||
@ -153,7 +161,14 @@ func TestMarketPredicates(t *testing.T) {
|
|||||||
abi.DealID(3): newProp3, // new
|
abi.DealID(3): newProp3, // new
|
||||||
// NB: DealProposals cannot be modified, so don't test that case.
|
// NB: DealProposals cannot be modified, so don't test that case.
|
||||||
}
|
}
|
||||||
newStateC := createMarketState(ctx, t, store, newDeals, newProps)
|
newBalances := map[address.Address]balance{
|
||||||
|
tutils.NewIDAddr(t, 1): balance{abi.NewTokenAmount(3000), abi.NewTokenAmount(0)},
|
||||||
|
tutils.NewIDAddr(t, 2): balance{abi.NewTokenAmount(2000), abi.NewTokenAmount(500)},
|
||||||
|
tutils.NewIDAddr(t, 4): balance{abi.NewTokenAmount(5000), abi.NewTokenAmount(0)},
|
||||||
|
tutils.NewIDAddr(t, 5): balance{abi.NewTokenAmount(1000), abi.NewTokenAmount(3000)},
|
||||||
|
}
|
||||||
|
|
||||||
|
newStateC := createMarketState(ctx, t, store, newDeals, newProps, newBalances)
|
||||||
|
|
||||||
minerAddr, err := address.NewFromString("t00")
|
minerAddr, err := address.NewFromString("t00")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -276,6 +291,63 @@ func TestMarketPredicates(t *testing.T) {
|
|||||||
require.Equal(t, abi.DealID(2), changedProps.Removed[0].ID)
|
require.Equal(t, abi.DealID(2), changedProps.Removed[0].ID)
|
||||||
require.Equal(t, *oldProp2, changedProps.Removed[0].Proposal)
|
require.Equal(t, *oldProp2, changedProps.Removed[0].Proposal)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("balances predicate", func(t *testing.T) {
|
||||||
|
preds := NewStatePredicates(api)
|
||||||
|
|
||||||
|
getAddresses := func() []address.Address {
|
||||||
|
return []address.Address{tutils.NewIDAddr(t, 1), tutils.NewIDAddr(t, 2), tutils.NewIDAddr(t, 3), tutils.NewIDAddr(t, 4)}
|
||||||
|
}
|
||||||
|
diffBalancesFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(getAddresses)))
|
||||||
|
|
||||||
|
// Diff a state against itself: expect no change
|
||||||
|
changed, _, err := diffBalancesFn(ctx, oldState.Key(), oldState.Key())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, changed)
|
||||||
|
|
||||||
|
// Diff old state against new state
|
||||||
|
changed, valIDs, err := diffBalancesFn(ctx, oldState.Key(), newState.Key())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, changed)
|
||||||
|
|
||||||
|
changedBalances, ok := valIDs.(ChangedBalances)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Len(t, changedBalances, 3)
|
||||||
|
require.Contains(t, changedBalances, tutils.NewIDAddr(t, 1))
|
||||||
|
require.Contains(t, changedBalances, tutils.NewIDAddr(t, 3))
|
||||||
|
require.Contains(t, changedBalances, tutils.NewIDAddr(t, 4))
|
||||||
|
|
||||||
|
balance1 := changedBalances[tutils.NewIDAddr(t, 1)]
|
||||||
|
if !balance1.From.Equals(abi.NewTokenAmount(1000)) || !balance1.To.Equals(abi.NewTokenAmount(3000)) {
|
||||||
|
t.Fatal("Unexpected change to balance")
|
||||||
|
}
|
||||||
|
balance3 := changedBalances[tutils.NewIDAddr(t, 3)]
|
||||||
|
if !balance3.From.Equals(abi.NewTokenAmount(3000)) || !balance3.To.Equals(abi.NewTokenAmount(0)) {
|
||||||
|
t.Fatal("Unexpected change to balance")
|
||||||
|
}
|
||||||
|
balance4 := changedBalances[tutils.NewIDAddr(t, 4)]
|
||||||
|
if !balance4.From.Equals(abi.NewTokenAmount(0)) || !balance4.To.Equals(abi.NewTokenAmount(5000)) {
|
||||||
|
t.Fatal("Unexpected change to balance")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Diff with non-existent address.
|
||||||
|
getNoAddress := func() []address.Address { return []address.Address{tutils.NewIDAddr(t, 6)} }
|
||||||
|
diffNoAddressFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(getNoAddress)))
|
||||||
|
changed, _, err = diffNoAddressFn(ctx, oldState.Key(), newState.Key())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, changed)
|
||||||
|
|
||||||
|
// Test that OnBalanceChanged does not call the callback if the state has not changed
|
||||||
|
diffDealBalancesFn := preds.OnBalanceChanged(func(context.Context, BalanceTables, BalanceTables) (bool, UserData, error) {
|
||||||
|
t.Fatal("No state change so this should not be called")
|
||||||
|
return false, nil, nil
|
||||||
|
})
|
||||||
|
marketState := createEmptyMarketState(t, store)
|
||||||
|
changed, _, err = diffDealBalancesFn(ctx, marketState, marketState)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, changed)
|
||||||
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMinerSectorChange(t *testing.T) {
|
func TestMinerSectorChange(t *testing.T) {
|
||||||
@ -373,13 +445,20 @@ func mockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, err
|
|||||||
}})
|
}})
|
||||||
}
|
}
|
||||||
|
|
||||||
func createMarketState(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState, props map[abi.DealID]*market.DealProposal) cid.Cid {
|
type balance struct {
|
||||||
|
available abi.TokenAmount
|
||||||
|
locked abi.TokenAmount
|
||||||
|
}
|
||||||
|
|
||||||
|
func createMarketState(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState, props map[abi.DealID]*market.DealProposal, balances map[address.Address]balance) cid.Cid {
|
||||||
dealRootCid := createDealAMT(ctx, t, store, deals)
|
dealRootCid := createDealAMT(ctx, t, store, deals)
|
||||||
propRootCid := createProposalAMT(ctx, t, store, props)
|
propRootCid := createProposalAMT(ctx, t, store, props)
|
||||||
|
balancesCids := createBalanceTable(ctx, t, store, balances)
|
||||||
state := createEmptyMarketState(t, store)
|
state := createEmptyMarketState(t, store)
|
||||||
state.States = dealRootCid
|
state.States = dealRootCid
|
||||||
state.Proposals = propRootCid
|
state.Proposals = propRootCid
|
||||||
|
state.EscrowTable = balancesCids[0]
|
||||||
|
state.LockedTable = balancesCids[1]
|
||||||
|
|
||||||
stateC, err := store.Put(ctx, state)
|
stateC, err := store.Put(ctx, state)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -416,6 +495,31 @@ func createProposalAMT(ctx context.Context, t *testing.T, store adt.Store, props
|
|||||||
return rootCid
|
return rootCid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createBalanceTable(ctx context.Context, t *testing.T, store adt.Store, balances map[address.Address]balance) [2]cid.Cid {
|
||||||
|
escrowMapRoot := adt.MakeEmptyMap(store)
|
||||||
|
escrowMapRootCid, err := escrowMapRoot.Root()
|
||||||
|
require.NoError(t, err)
|
||||||
|
escrowRoot, err := adt.AsBalanceTable(store, escrowMapRootCid)
|
||||||
|
require.NoError(t, err)
|
||||||
|
lockedMapRoot := adt.MakeEmptyMap(store)
|
||||||
|
lockedMapRootCid, err := lockedMapRoot.Root()
|
||||||
|
require.NoError(t, err)
|
||||||
|
lockedRoot, err := adt.AsBalanceTable(store, lockedMapRootCid)
|
||||||
|
|
||||||
|
for addr, balance := range balances {
|
||||||
|
err := escrowRoot.Add(addr, big.Add(balance.available, balance.locked))
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = lockedRoot.Add(addr, balance.locked)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
}
|
||||||
|
escrowRootCid, err := escrowRoot.Root()
|
||||||
|
require.NoError(t, err)
|
||||||
|
lockedRootCid, err := lockedRoot.Root()
|
||||||
|
require.NoError(t, err)
|
||||||
|
return [2]cid.Cid{escrowRootCid, lockedRootCid}
|
||||||
|
}
|
||||||
|
|
||||||
func createMinerState(ctx context.Context, t *testing.T, store adt.Store, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid {
|
func createMinerState(ctx context.Context, t *testing.T, store adt.Store, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid {
|
||||||
rootCid := createSectorsAMT(ctx, t, store, sectors)
|
rootCid := createSectorsAMT(ctx, t, store, sectors)
|
||||||
|
|
||||||
|
@ -2,44 +2,123 @@ package market
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
|
"go.uber.org/fx"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
|
"github.com/filecoin-project/lotus/chain/events/state"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/node/impl/full"
|
"github.com/filecoin-project/lotus/node/impl/full"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("market_adapter")
|
var log = logging.Logger("market_adapter")
|
||||||
|
|
||||||
type FundMgr struct {
|
// API is the dependencies need to run a fund manager
|
||||||
sm *stmgr.StateManager
|
type API struct {
|
||||||
mpool full.MpoolAPI
|
fx.In
|
||||||
|
|
||||||
lk sync.Mutex
|
full.ChainAPI
|
||||||
|
full.StateAPI
|
||||||
|
full.MpoolAPI
|
||||||
|
}
|
||||||
|
|
||||||
|
// FundMgr monitors available balances and adds funds when EnsureAvailable is called
|
||||||
|
type FundMgr struct {
|
||||||
|
api fundMgrAPI
|
||||||
|
|
||||||
|
lk sync.RWMutex
|
||||||
available map[address.Address]types.BigInt
|
available map[address.Address]types.BigInt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFundMgr(sm *stmgr.StateManager, mpool full.MpoolAPI) *FundMgr {
|
// StartFundManager creates a new fund manager and sets up event hooks to manage state changes
|
||||||
return &FundMgr{
|
func StartFundManager(lc fx.Lifecycle, api API) *FundMgr {
|
||||||
sm: sm,
|
fm := newFundMgr(&api)
|
||||||
mpool: mpool,
|
lc.Append(fx.Hook{
|
||||||
|
OnStart: func(ctx context.Context) error {
|
||||||
|
ev := events.NewEvents(ctx, &api)
|
||||||
|
preds := state.NewStatePredicates(&api)
|
||||||
|
dealDiffFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(fm.getAddresses)))
|
||||||
|
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
|
||||||
|
return dealDiffFn(ctx, oldTs.Key(), newTs.Key())
|
||||||
|
}
|
||||||
|
return ev.StateChanged(fm.checkFunc, fm.stateChanged, fm.revert, int(build.MessageConfidence), events.NoTimeout, match)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return fm
|
||||||
|
}
|
||||||
|
|
||||||
|
type fundMgrAPI interface {
|
||||||
|
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
|
||||||
|
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
|
||||||
|
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFundMgr(api fundMgrAPI) *FundMgr {
|
||||||
|
return &FundMgr{
|
||||||
|
api: api,
|
||||||
available: map[address.Address]types.BigInt{},
|
available: map[address.Address]types.BigInt{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) {
|
// checkFunc tells the events api to simply proceed (we always want to watch)
|
||||||
fm.lk.Lock()
|
func (fm *FundMgr) checkFunc(ts *types.TipSet) (done bool, more bool, err error) {
|
||||||
avail, ok := fm.available[addr]
|
return false, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// revert handles reverts to balances
|
||||||
|
func (fm *FundMgr) revert(ctx context.Context, ts *types.TipSet) error {
|
||||||
|
// TODO: Is it ok to just ignore this?
|
||||||
|
log.Warn("balance change reverted; TODO: actually handle this!")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// stateChanged handles balance changes monitored on the chain from one tipset to the next
|
||||||
|
func (fm *FundMgr) stateChanged(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
|
||||||
|
changedBalances, ok := states.(state.ChangedBalances)
|
||||||
if !ok {
|
if !ok {
|
||||||
bal, err := fm.sm.MarketBalance(ctx, addr, nil)
|
panic("Expected state.ChangedBalances")
|
||||||
|
}
|
||||||
|
// overwrite our in memory cache with new values from chain (chain is canonical)
|
||||||
|
fm.lk.Lock()
|
||||||
|
for addr, balanceChange := range changedBalances {
|
||||||
|
fm.available[addr] = balanceChange.To
|
||||||
|
}
|
||||||
|
fm.lk.Unlock()
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fm *FundMgr) getAddresses() []address.Address {
|
||||||
|
fm.lk.RLock()
|
||||||
|
defer fm.lk.RUnlock()
|
||||||
|
addrs := make([]address.Address, 0, len(fm.available))
|
||||||
|
for addr := range fm.available {
|
||||||
|
addrs = append(addrs, addr)
|
||||||
|
}
|
||||||
|
return addrs
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnsureAvailable looks at the available balance in escrow for a given
|
||||||
|
// address, and if less than the passed in amount, adds the difference
|
||||||
|
func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) {
|
||||||
|
idAddr, err := fm.api.StateLookupID(ctx, addr, types.EmptyTSK)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
fm.lk.Lock()
|
||||||
|
avail, ok := fm.available[idAddr]
|
||||||
|
if !ok {
|
||||||
|
bal, err := fm.api.StateMarketBalance(ctx, addr, types.EmptyTSK)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fm.lk.Unlock()
|
fm.lk.Unlock()
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
@ -48,38 +127,32 @@ func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Add
|
|||||||
avail = types.BigSub(bal.Escrow, bal.Locked)
|
avail = types.BigSub(bal.Escrow, bal.Locked)
|
||||||
}
|
}
|
||||||
|
|
||||||
toAdd := types.NewInt(0)
|
toAdd := types.BigSub(amt, avail)
|
||||||
avail = types.BigSub(avail, amt)
|
if toAdd.LessThan(types.NewInt(0)) {
|
||||||
if avail.LessThan(types.NewInt(0)) {
|
toAdd = types.NewInt(0)
|
||||||
// TODO: some rules around adding more to avoid doing stuff on-chain
|
|
||||||
// all the time
|
|
||||||
toAdd = avail.Neg()
|
|
||||||
avail = types.NewInt(0)
|
|
||||||
}
|
}
|
||||||
fm.available[addr] = avail
|
fm.available[idAddr] = big.Add(avail, toAdd)
|
||||||
|
|
||||||
fm.lk.Unlock()
|
fm.lk.Unlock()
|
||||||
|
|
||||||
if toAdd.LessThanEqual(big.Zero()) {
|
if toAdd.LessThanEqual(big.Zero()) {
|
||||||
return cid.Undef, nil
|
return cid.Undef, nil
|
||||||
} else {
|
|
||||||
var err error
|
|
||||||
params, err := actors.SerializeParams(&addr)
|
|
||||||
if err != nil {
|
|
||||||
return cid.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
smsg, err := fm.mpool.MpoolPushMessage(ctx, &types.Message{
|
|
||||||
To: builtin.StorageMarketActorAddr,
|
|
||||||
From: wallet,
|
|
||||||
Value: toAdd,
|
|
||||||
Method: builtin.MethodsMarket.AddBalance,
|
|
||||||
Params: params,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return cid.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return smsg.Cid(), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
params, err := actors.SerializeParams(&addr)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
smsg, err := fm.api.MpoolPushMessage(ctx, &types.Message{
|
||||||
|
To: builtin.StorageMarketActorAddr,
|
||||||
|
From: wallet,
|
||||||
|
Value: toAdd,
|
||||||
|
Method: builtin.MethodsMarket.AddBalance,
|
||||||
|
Params: params,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return smsg.Cid(), nil
|
||||||
}
|
}
|
||||||
|
187
chain/market/fundmgr_test.go
Normal file
187
chain/market/fundmgr_test.go
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
package market
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/crypto"
|
||||||
|
tutils "github.com/filecoin-project/specs-actors/support/testing"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeAPI struct {
|
||||||
|
returnedBalance api.MarketBalance
|
||||||
|
returnedBalanceErr error
|
||||||
|
signature crypto.Signature
|
||||||
|
receivedMessage *types.Message
|
||||||
|
pushMessageErr error
|
||||||
|
lookupIDErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fapi *fakeAPI) StateLookupID(_ context.Context, addr address.Address, _ types.TipSetKey) (address.Address, error) {
|
||||||
|
return addr, fapi.lookupIDErr
|
||||||
|
}
|
||||||
|
func (fapi *fakeAPI) StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) {
|
||||||
|
return fapi.returnedBalance, fapi.returnedBalanceErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fapi *fakeAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
|
||||||
|
fapi.receivedMessage = msg
|
||||||
|
return &types.SignedMessage{
|
||||||
|
Message: *msg,
|
||||||
|
Signature: fapi.signature,
|
||||||
|
}, fapi.pushMessageErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func addFundsMsg(toAdd abi.TokenAmount, addr address.Address, wallet address.Address) *types.Message {
|
||||||
|
params, _ := actors.SerializeParams(&addr)
|
||||||
|
return &types.Message{
|
||||||
|
To: builtin.StorageMarketActorAddr,
|
||||||
|
From: wallet,
|
||||||
|
Value: toAdd,
|
||||||
|
Method: builtin.MethodsMarket.AddBalance,
|
||||||
|
Params: params,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type expectedResult struct {
|
||||||
|
addAmt abi.TokenAmount
|
||||||
|
shouldAdd bool
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddFunds(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
testCases := map[string]struct {
|
||||||
|
returnedBalanceErr error
|
||||||
|
returnedBalance api.MarketBalance
|
||||||
|
addAmounts []abi.TokenAmount
|
||||||
|
pushMessageErr error
|
||||||
|
expectedResults []expectedResult
|
||||||
|
lookupIDErr error
|
||||||
|
}{
|
||||||
|
"succeeds, trivial case": {
|
||||||
|
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(0), Locked: abi.NewTokenAmount(0)},
|
||||||
|
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
||||||
|
expectedResults: []expectedResult{
|
||||||
|
{
|
||||||
|
addAmt: abi.NewTokenAmount(100),
|
||||||
|
shouldAdd: true,
|
||||||
|
err: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"succeeds, money already present": {
|
||||||
|
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(150), Locked: abi.NewTokenAmount(50)},
|
||||||
|
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
||||||
|
expectedResults: []expectedResult{
|
||||||
|
{
|
||||||
|
shouldAdd: false,
|
||||||
|
err: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"succeeds, multiple adds": {
|
||||||
|
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(150), Locked: abi.NewTokenAmount(50)},
|
||||||
|
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100), abi.NewTokenAmount(200), abi.NewTokenAmount(250), abi.NewTokenAmount(250)},
|
||||||
|
expectedResults: []expectedResult{
|
||||||
|
{
|
||||||
|
shouldAdd: false,
|
||||||
|
err: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
addAmt: abi.NewTokenAmount(100),
|
||||||
|
shouldAdd: true,
|
||||||
|
err: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
addAmt: abi.NewTokenAmount(50),
|
||||||
|
shouldAdd: true,
|
||||||
|
err: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
shouldAdd: false,
|
||||||
|
err: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"error on market balance": {
|
||||||
|
returnedBalanceErr: errors.New("something went wrong"),
|
||||||
|
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
||||||
|
expectedResults: []expectedResult{
|
||||||
|
{
|
||||||
|
err: errors.New("something went wrong"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"error on push message": {
|
||||||
|
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(0), Locked: abi.NewTokenAmount(0)},
|
||||||
|
pushMessageErr: errors.New("something went wrong"),
|
||||||
|
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
||||||
|
expectedResults: []expectedResult{
|
||||||
|
{
|
||||||
|
err: errors.New("something went wrong"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"error looking up address": {
|
||||||
|
lookupIDErr: errors.New("something went wrong"),
|
||||||
|
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
||||||
|
expectedResults: []expectedResult{
|
||||||
|
{
|
||||||
|
err: errors.New("something went wrong"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for testCase, data := range testCases {
|
||||||
|
t.Run(testCase, func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
sig := make([]byte, 100)
|
||||||
|
_, err := rand.Read(sig)
|
||||||
|
require.NoError(t, err)
|
||||||
|
fapi := &fakeAPI{
|
||||||
|
returnedBalance: data.returnedBalance,
|
||||||
|
returnedBalanceErr: data.returnedBalanceErr,
|
||||||
|
signature: crypto.Signature{
|
||||||
|
Type: crypto.SigTypeUnknown,
|
||||||
|
Data: sig,
|
||||||
|
},
|
||||||
|
pushMessageErr: data.pushMessageErr,
|
||||||
|
lookupIDErr: data.lookupIDErr,
|
||||||
|
}
|
||||||
|
fundMgr := newFundMgr(fapi)
|
||||||
|
addr := tutils.NewIDAddr(t, uint64(rand.Uint32()))
|
||||||
|
wallet := tutils.NewIDAddr(t, uint64(rand.Uint32()))
|
||||||
|
for i, amount := range data.addAmounts {
|
||||||
|
fapi.receivedMessage = nil
|
||||||
|
_, err := fundMgr.EnsureAvailable(ctx, addr, wallet, amount)
|
||||||
|
expected := data.expectedResults[i]
|
||||||
|
if expected.err == nil {
|
||||||
|
require.NoError(t, err)
|
||||||
|
if expected.shouldAdd {
|
||||||
|
expectedMessage := addFundsMsg(expected.addAmt, addr, wallet)
|
||||||
|
require.Equal(t, expectedMessage, fapi.receivedMessage)
|
||||||
|
} else {
|
||||||
|
require.Nil(t, fapi.receivedMessage)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
require.EqualError(t, err, expected.err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -5,6 +5,8 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/markets/dealfilter"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
ci "github.com/libp2p/go-libp2p-core/crypto"
|
ci "github.com/libp2p/go-libp2p-core/crypto"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
@ -276,7 +278,7 @@ func Online() Option {
|
|||||||
|
|
||||||
Override(new(*paychmgr.Store), paychmgr.NewStore),
|
Override(new(*paychmgr.Store), paychmgr.NewStore),
|
||||||
Override(new(*paychmgr.Manager), paychmgr.NewManager),
|
Override(new(*paychmgr.Manager), paychmgr.NewManager),
|
||||||
Override(new(*market.FundMgr), market.NewFundMgr),
|
Override(new(*market.FundMgr), market.StartFundManager),
|
||||||
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
|
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
|
||||||
),
|
),
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user