From f4720ddb2cd2616ddb963d57e6d4f087fbbaf539 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 4 Aug 2020 17:28:05 -0700 Subject: [PATCH 1/4] feat(state): add predicate for deal id watching --- chain/events/state/predicates.go | 96 ++++++++++++++++++++++ chain/events/state/predicates_test.go | 114 ++++++++++++++++++++++++-- 2 files changed, 205 insertions(+), 5 deletions(-) diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 59f18753d..ca1792e53 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -10,6 +10,7 @@ import ( typegen "github.com/whyrusleeping/cbor-gen" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/miner" @@ -86,6 +87,50 @@ func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState Di }) } +type BalanceTables struct { + EscrowTable *adt.BalanceTable + LockedTable *adt.BalanceTable +} + +// DiffBalanceTablesFunc compares two balance tables +type DiffBalanceTablesFunc func(ctx context.Context, oldBalanceTable, newBalanceTable BalanceTables) (changed bool, user UserData, err error) + +// OnBalanceChanged runs when the escrow table for available balances changes +func (sp *StatePredicates) OnBalanceChanged(diffBalances DiffBalanceTablesFunc) DiffStorageMarketStateFunc { + return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { + if oldState.EscrowTable.Equals(newState.EscrowTable) && oldState.LockedTable.Equals(newState.LockedTable) { + return false, nil, nil + } + + ctxStore := &contextStore{ + ctx: ctx, + cst: sp.cst, + } + + oldEscrowRoot, err := adt.AsBalanceTable(ctxStore, oldState.EscrowTable) + if err != nil { + return false, nil, err + } + + oldLockedRoot, err := adt.AsBalanceTable(ctxStore, oldState.LockedTable) + if err != nil { + return false, nil, err + } + + newEscrowRoot, err := adt.AsBalanceTable(ctxStore, newState.EscrowTable) + if err != nil { + return false, nil, err + } + + newLockedRoot, err := adt.AsBalanceTable(ctxStore, newState.LockedTable) + if err != nil { + return false, nil, err + } + + return diffBalances(ctx, BalanceTables{oldEscrowRoot, oldLockedRoot}, BalanceTables{newEscrowRoot, newLockedRoot}) + } +} + type DiffAdtArraysFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot *adt.Array) (changed bool, user UserData, err error) // OnDealStateChanged calls diffDealStates when the market deal state changes @@ -309,6 +354,57 @@ func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffAdtA } } +// ChangedBalances is a set of changes to deal state +type ChangedBalances map[address.Address]BalanceChange + +// BalanceChange is a change in balance from -> to +type BalanceChange struct { + From abi.TokenAmount + To abi.TokenAmount +} + +// AvailableBalanceChangedForAddresses detects changes in the escrow table for the given addresses +func (sp *StatePredicates) AvailableBalanceChangedForAddresses(getAddrs func() []address.Address) DiffBalanceTablesFunc { + return func(ctx context.Context, oldBalances, newBalances BalanceTables) (changed bool, user UserData, err error) { + changedBalances := make(ChangedBalances) + addrs := getAddrs() + for _, addr := range addrs { + // If the deal has been removed, we just set it to nil + oldEscrowBalance, err := oldBalances.EscrowTable.Get(addr) + if err != nil { + return false, nil, err + } + + oldLockedBalance, err := oldBalances.LockedTable.Get(addr) + if err != nil { + return false, nil, err + } + + oldBalance := big.Sub(oldEscrowBalance, oldLockedBalance) + + newEscrowBalance, err := newBalances.EscrowTable.Get(addr) + if err != nil { + return false, nil, err + } + + newLockedBalance, err := newBalances.LockedTable.Get(addr) + if err != nil { + return false, nil, err + } + + newBalance := big.Sub(newEscrowBalance, newLockedBalance) + + if !oldBalance.Equals(newBalance) { + changedBalances[addr] = BalanceChange{oldBalance, newBalance} + } + } + if len(changedBalances) > 0 { + return true, changedBalances, nil + } + return false, nil, nil + } +} + type DiffMinerActorStateFunc func(ctx context.Context, oldState *miner.State, newState *miner.State) (changed bool, user UserData, err error) func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMinerActorState DiffMinerActorStateFunc) DiffTipSetKeyFunc { diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index 02461c331..1c08703fa 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -2,9 +2,10 @@ package state import ( "context" - "github.com/filecoin-project/go-bitfield" "testing" + "github.com/filecoin-project/go-bitfield" + "github.com/stretchr/testify/require" "golang.org/x/xerrors" @@ -112,7 +113,14 @@ func TestMarketPredicates(t *testing.T) { abi.DealID(2): oldProp2, } - oldStateC := createMarketState(ctx, t, store, oldDeals, oldProps) + oldBalances := map[address.Address]balance{ + tutils.NewIDAddr(t, 1): balance{abi.NewTokenAmount(1000), abi.NewTokenAmount(1000)}, + tutils.NewIDAddr(t, 2): balance{abi.NewTokenAmount(2000), abi.NewTokenAmount(500)}, + tutils.NewIDAddr(t, 3): balance{abi.NewTokenAmount(3000), abi.NewTokenAmount(2000)}, + tutils.NewIDAddr(t, 5): balance{abi.NewTokenAmount(3000), abi.NewTokenAmount(1000)}, + } + + oldStateC := createMarketState(ctx, t, store, oldDeals, oldProps, oldBalances) newDeal1 := &market.DealState{ SectorStartEpoch: 1, @@ -153,7 +161,14 @@ func TestMarketPredicates(t *testing.T) { abi.DealID(3): newProp3, // new // NB: DealProposals cannot be modified, so don't test that case. } - newStateC := createMarketState(ctx, t, store, newDeals, newProps) + newBalances := map[address.Address]balance{ + tutils.NewIDAddr(t, 1): balance{abi.NewTokenAmount(3000), abi.NewTokenAmount(0)}, + tutils.NewIDAddr(t, 2): balance{abi.NewTokenAmount(2000), abi.NewTokenAmount(500)}, + tutils.NewIDAddr(t, 4): balance{abi.NewTokenAmount(5000), abi.NewTokenAmount(0)}, + tutils.NewIDAddr(t, 5): balance{abi.NewTokenAmount(1000), abi.NewTokenAmount(3000)}, + } + + newStateC := createMarketState(ctx, t, store, newDeals, newProps, newBalances) minerAddr, err := address.NewFromString("t00") require.NoError(t, err) @@ -276,6 +291,63 @@ func TestMarketPredicates(t *testing.T) { require.Equal(t, abi.DealID(2), changedProps.Removed[0].ID) require.Equal(t, *oldProp2, changedProps.Removed[0].Proposal) }) + + t.Run("balances predicate", func(t *testing.T) { + preds := NewStatePredicates(api) + + getAddresses := func() []address.Address { + return []address.Address{tutils.NewIDAddr(t, 1), tutils.NewIDAddr(t, 2), tutils.NewIDAddr(t, 3), tutils.NewIDAddr(t, 4)} + } + diffBalancesFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(getAddresses))) + + // Diff a state against itself: expect no change + changed, _, err := diffBalancesFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, changed) + + // Diff old state against new state + changed, valIDs, err := diffBalancesFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.True(t, changed) + + changedBalances, ok := valIDs.(ChangedBalances) + require.True(t, ok) + require.Len(t, changedBalances, 3) + require.Contains(t, changedBalances, tutils.NewIDAddr(t, 1)) + require.Contains(t, changedBalances, tutils.NewIDAddr(t, 3)) + require.Contains(t, changedBalances, tutils.NewIDAddr(t, 4)) + + balance1 := changedBalances[tutils.NewIDAddr(t, 1)] + if !balance1.From.Equals(abi.NewTokenAmount(1000)) || !balance1.To.Equals(abi.NewTokenAmount(3000)) { + t.Fatal("Unexpected change to balance") + } + balance3 := changedBalances[tutils.NewIDAddr(t, 3)] + if !balance3.From.Equals(abi.NewTokenAmount(3000)) || !balance3.To.Equals(abi.NewTokenAmount(0)) { + t.Fatal("Unexpected change to balance") + } + balance4 := changedBalances[tutils.NewIDAddr(t, 4)] + if !balance4.From.Equals(abi.NewTokenAmount(0)) || !balance4.To.Equals(abi.NewTokenAmount(5000)) { + t.Fatal("Unexpected change to balance") + } + + // Diff with non-existent address. + getNoAddress := func() []address.Address { return []address.Address{tutils.NewIDAddr(t, 6)} } + diffNoAddressFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(getNoAddress))) + changed, _, err = diffNoAddressFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.False(t, changed) + + // Test that OnBalanceChanged does not call the callback if the state has not changed + diffDealBalancesFn := preds.OnBalanceChanged(func(context.Context, BalanceTables, BalanceTables) (bool, UserData, error) { + t.Fatal("No state change so this should not be called") + return false, nil, nil + }) + marketState := createEmptyMarketState(t, store) + changed, _, err = diffDealBalancesFn(ctx, marketState, marketState) + require.NoError(t, err) + require.False(t, changed) + }) + } func TestMinerSectorChange(t *testing.T) { @@ -373,13 +445,20 @@ func mockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, err }}) } -func createMarketState(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState, props map[abi.DealID]*market.DealProposal) cid.Cid { +type balance struct { + available abi.TokenAmount + locked abi.TokenAmount +} + +func createMarketState(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState, props map[abi.DealID]*market.DealProposal, balances map[address.Address]balance) cid.Cid { dealRootCid := createDealAMT(ctx, t, store, deals) propRootCid := createProposalAMT(ctx, t, store, props) - + balancesCids := createBalanceTable(ctx, t, store, balances) state := createEmptyMarketState(t, store) state.States = dealRootCid state.Proposals = propRootCid + state.EscrowTable = balancesCids[0] + state.LockedTable = balancesCids[1] stateC, err := store.Put(ctx, state) require.NoError(t, err) @@ -416,6 +495,31 @@ func createProposalAMT(ctx context.Context, t *testing.T, store adt.Store, props return rootCid } +func createBalanceTable(ctx context.Context, t *testing.T, store adt.Store, balances map[address.Address]balance) [2]cid.Cid { + escrowMapRoot := adt.MakeEmptyMap(store) + escrowMapRootCid, err := escrowMapRoot.Root() + require.NoError(t, err) + escrowRoot, err := adt.AsBalanceTable(store, escrowMapRootCid) + require.NoError(t, err) + lockedMapRoot := adt.MakeEmptyMap(store) + lockedMapRootCid, err := lockedMapRoot.Root() + require.NoError(t, err) + lockedRoot, err := adt.AsBalanceTable(store, lockedMapRootCid) + + for addr, balance := range balances { + err := escrowRoot.Add(addr, big.Add(balance.available, balance.locked)) + require.NoError(t, err) + err = lockedRoot.Add(addr, balance.locked) + require.NoError(t, err) + + } + escrowRootCid, err := escrowRoot.Root() + require.NoError(t, err) + lockedRootCid, err := lockedRoot.Root() + require.NoError(t, err) + return [2]cid.Cid{escrowRootCid, lockedRootCid} +} + func createMinerState(ctx context.Context, t *testing.T, store adt.Store, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid { rootCid := createSectorsAMT(ctx, t, store, sectors) From b4d1b628be187399ced7b2d579c70f3ab3fbca7f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 4 Aug 2020 17:29:31 -0700 Subject: [PATCH 2/4] feat(fundmgr): add monitoring to FundMgr --- chain/market/fundmgr.go | 147 +++++++++++++++++++++++++++++----------- node/builder.go | 5 +- 2 files changed, 111 insertions(+), 41 deletions(-) diff --git a/chain/market/fundmgr.go b/chain/market/fundmgr.go index e390c530f..325707d79 100644 --- a/chain/market/fundmgr.go +++ b/chain/market/fundmgr.go @@ -2,44 +2,118 @@ package market import ( "context" - "github.com/filecoin-project/specs-actors/actors/abi/big" "sync" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "go.uber.org/fx" + "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/impl/full" ) var log = logging.Logger("market_adapter") -type FundMgr struct { - sm *stmgr.StateManager - mpool full.MpoolAPI +// API is the dependencies need to run a fund manager +type API struct { + fx.In - lk sync.Mutex + full.ChainAPI + full.StateAPI + full.MpoolAPI +} + +// FundMgr monitors available balances and adds funds when EnsureAvailable is called +type FundMgr struct { + api fundMgrAPI + + lk sync.RWMutex available map[address.Address]types.BigInt } -func NewFundMgr(sm *stmgr.StateManager, mpool full.MpoolAPI) *FundMgr { - return &FundMgr{ - sm: sm, - mpool: mpool, +// StartFundManager creates a new fund manager and sets up event hooks to manage state changes +func StartFundManager(lc fx.Lifecycle, api API) *FundMgr { + fm := newFundMgr(&api) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + ev := events.NewEvents(ctx, &api) + preds := state.NewStatePredicates(&api) + dealDiffFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(fm.getAddresses))) + match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { + return dealDiffFn(ctx, oldTs.Key(), newTs.Key()) + } + return ev.StateChanged(fm.checkFunc, fm.stateChanged, fm.revert, int(build.MessageConfidence+1), events.NoTimeout, match) + }, + }) + return fm +} +type fundMgrAPI interface { + StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) + MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) +} + +func newFundMgr(api fundMgrAPI) *FundMgr { + return &FundMgr{ + api: api, available: map[address.Address]types.BigInt{}, } } +// checkFunc tells the events api to simply proceed (we always want to watch) +func (fm *FundMgr) checkFunc(ts *types.TipSet) (done bool, more bool, err error) { + return false, true, nil +} + +// revert handles reverts to balances +func (fm *FundMgr) revert(ctx context.Context, ts *types.TipSet) error { + // TODO: Is it ok to just ignore this? + log.Warn("balance change reverted; TODO: actually handle this!") + return nil +} + +// stateChanged handles balance changes monitored on the chain from one tipset to the next +func (fm *FundMgr) stateChanged(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { + changedBalances, ok := states.(state.ChangedBalances) + if !ok { + panic("Expected state.ChangedBalances") + } + // overwrite our in memory cache with new values from chain (chain is canonical) + fm.lk.Lock() + for addr, balanceChange := range changedBalances { + fm.available[addr] = balanceChange.To + } + fm.lk.Unlock() + return true, nil +} + +func (fm *FundMgr) getAddresses() []address.Address { + fm.lk.RLock() + defer fm.lk.RUnlock() + addrs := make([]address.Address, 0, len(fm.available)) + for addr := range fm.available { + addrs = append(addrs, addr) + } + return addrs +} + +// EnsureAvailable looks at the available balance in escrow for a given +// address, and if less than the passed in amount, adds the difference func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) { fm.lk.Lock() avail, ok := fm.available[addr] if !ok { - bal, err := fm.sm.MarketBalance(ctx, addr, nil) + bal, err := fm.api.StateMarketBalance(ctx, addr, types.EmptyTSK) if err != nil { fm.lk.Unlock() return cid.Undef, err @@ -48,38 +122,33 @@ func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Add avail = types.BigSub(bal.Escrow, bal.Locked) } - toAdd := types.NewInt(0) - avail = types.BigSub(avail, amt) - if avail.LessThan(types.NewInt(0)) { - // TODO: some rules around adding more to avoid doing stuff on-chain - // all the time - toAdd = avail.Neg() - avail = types.NewInt(0) + toAdd := types.BigSub(amt, avail) + if toAdd.LessThan(types.NewInt(0)) { + toAdd = types.NewInt(0) } - fm.available[addr] = avail - + fm.available[addr] = big.Add(avail, toAdd) fm.lk.Unlock() if toAdd.LessThanEqual(big.Zero()) { return cid.Undef, nil - } else { - var err error - params, err := actors.SerializeParams(&addr) - if err != nil { - return cid.Undef, err - } - - smsg, err := fm.mpool.MpoolPushMessage(ctx, &types.Message{ - To: builtin.StorageMarketActorAddr, - From: wallet, - Value: toAdd, - Method: builtin.MethodsMarket.AddBalance, - Params: params, - }) - if err != nil { - return cid.Undef, err - } - - return smsg.Cid(), nil } + + var err error + params, err := actors.SerializeParams(&addr) + if err != nil { + return cid.Undef, err + } + + smsg, err := fm.api.MpoolPushMessage(ctx, &types.Message{ + To: builtin.StorageMarketActorAddr, + From: wallet, + Value: toAdd, + Method: builtin.MethodsMarket.AddBalance, + Params: params, + }) + if err != nil { + return cid.Undef, err + } + + return smsg.Cid(), nil } diff --git a/node/builder.go b/node/builder.go index 171c4a96e..78d4d0d4a 100644 --- a/node/builder.go +++ b/node/builder.go @@ -3,9 +3,10 @@ package node import ( "context" "errors" - "github.com/filecoin-project/lotus/markets/dealfilter" "time" + "github.com/filecoin-project/lotus/markets/dealfilter" + logging "github.com/ipfs/go-log" ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -274,7 +275,7 @@ func Online() Option { Override(new(*paychmgr.Store), paychmgr.NewStore), Override(new(*paychmgr.Manager), paychmgr.NewManager), - Override(new(*market.FundMgr), market.NewFundMgr), + Override(new(*market.FundMgr), market.StartFundManager), Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels), ), From b2a114a808817c9140510172054a7595107b635a Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 4 Aug 2020 18:16:25 -0700 Subject: [PATCH 3/4] test(fundmgr): add FundMgr test --- chain/market/fundmgr_test.go | 172 +++++++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 chain/market/fundmgr_test.go diff --git a/chain/market/fundmgr_test.go b/chain/market/fundmgr_test.go new file mode 100644 index 000000000..49db12e45 --- /dev/null +++ b/chain/market/fundmgr_test.go @@ -0,0 +1,172 @@ +package market + +import ( + "context" + "errors" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/crypto" + tutils "github.com/filecoin-project/specs-actors/support/testing" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" +) + +type fakeAPI struct { + returnedBalance api.MarketBalance + returnedBalanceErr error + signature crypto.Signature + receivedMessage *types.Message + pushMessageErr error +} + +func (fapi *fakeAPI) StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) { + return fapi.returnedBalance, fapi.returnedBalanceErr +} + +func (fapi *fakeAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) { + fapi.receivedMessage = msg + return &types.SignedMessage{ + Message: *msg, + Signature: fapi.signature, + }, fapi.pushMessageErr +} + +func addFundsMsg(toAdd abi.TokenAmount, addr address.Address, wallet address.Address) *types.Message { + params, _ := actors.SerializeParams(&addr) + return &types.Message{ + To: builtin.StorageMarketActorAddr, + From: wallet, + Value: toAdd, + Method: builtin.MethodsMarket.AddBalance, + Params: params, + } +} + +type expectedResult struct { + addAmt abi.TokenAmount + shouldAdd bool + err error +} + +func TestAddFunds(t *testing.T) { + ctx := context.Background() + testCases := map[string]struct { + returnedBalanceErr error + returnedBalance api.MarketBalance + addAmounts []abi.TokenAmount + pushMessageErr error + expectedResults []expectedResult + }{ + "succeeds, trivial case": { + returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(0), Locked: abi.NewTokenAmount(0)}, + addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)}, + expectedResults: []expectedResult{ + { + addAmt: abi.NewTokenAmount(100), + shouldAdd: true, + err: nil, + }, + }, + }, + "succeeds, money already present": { + returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(150), Locked: abi.NewTokenAmount(50)}, + addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)}, + expectedResults: []expectedResult{ + { + shouldAdd: false, + err: nil, + }, + }, + }, + "succeeds, multiple adds": { + returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(150), Locked: abi.NewTokenAmount(50)}, + addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100), abi.NewTokenAmount(200), abi.NewTokenAmount(250), abi.NewTokenAmount(250)}, + expectedResults: []expectedResult{ + { + shouldAdd: false, + err: nil, + }, + { + addAmt: abi.NewTokenAmount(100), + shouldAdd: true, + err: nil, + }, + { + addAmt: abi.NewTokenAmount(50), + shouldAdd: true, + err: nil, + }, + { + shouldAdd: false, + err: nil, + }, + }, + }, + "error on market balance": { + returnedBalanceErr: errors.New("something went wrong"), + addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)}, + expectedResults: []expectedResult{ + { + err: errors.New("something went wrong"), + }, + }, + }, + "error on push message": { + returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(0), Locked: abi.NewTokenAmount(0)}, + pushMessageErr: errors.New("something went wrong"), + addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)}, + expectedResults: []expectedResult{ + { + err: errors.New("something went wrong"), + }, + }, + }, + } + + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + sig := make([]byte, 100) + _, err := rand.Read(sig) + require.NoError(t, err) + fapi := &fakeAPI{ + returnedBalance: data.returnedBalance, + returnedBalanceErr: data.returnedBalanceErr, + signature: crypto.Signature{ + Type: crypto.SigTypeUnknown, + Data: sig, + }, + pushMessageErr: data.pushMessageErr, + } + fundMgr := newFundMgr(fapi) + addr := tutils.NewIDAddr(t, uint64(rand.Uint32())) + wallet := tutils.NewIDAddr(t, uint64(rand.Uint32())) + for i, amount := range data.addAmounts { + fapi.receivedMessage = nil + _, err := fundMgr.EnsureAvailable(ctx, addr, wallet, amount) + expected := data.expectedResults[i] + if expected.err == nil { + require.NoError(t, err) + if expected.shouldAdd { + expectedMessage := addFundsMsg(expected.addAmt, addr, wallet) + require.Equal(t, expectedMessage, fapi.receivedMessage) + } else { + require.Nil(t, fapi.receivedMessage) + } + } else { + require.EqualError(t, err, expected.err.Error()) + } + } + }) + } +} From 9babf34a0c2e180d1d734d9729201e6d9c9364bf Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 5 Aug 2020 18:33:11 -0700 Subject: [PATCH 4/4] fix(fundmgr): switch to id addresses in order to observe the balance table correctly, convert to tracking funds by id address --- chain/market/fundmgr.go | 12 ++++++++---- chain/market/fundmgr_test.go | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/chain/market/fundmgr.go b/chain/market/fundmgr.go index 325707d79..b7673ab35 100644 --- a/chain/market/fundmgr.go +++ b/chain/market/fundmgr.go @@ -52,7 +52,7 @@ func StartFundManager(lc fx.Lifecycle, api API) *FundMgr { match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { return dealDiffFn(ctx, oldTs.Key(), newTs.Key()) } - return ev.StateChanged(fm.checkFunc, fm.stateChanged, fm.revert, int(build.MessageConfidence+1), events.NoTimeout, match) + return ev.StateChanged(fm.checkFunc, fm.stateChanged, fm.revert, int(build.MessageConfidence), events.NoTimeout, match) }, }) return fm @@ -61,6 +61,7 @@ func StartFundManager(lc fx.Lifecycle, api API) *FundMgr { type fundMgrAPI interface { StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) + StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) } func newFundMgr(api fundMgrAPI) *FundMgr { @@ -110,8 +111,12 @@ func (fm *FundMgr) getAddresses() []address.Address { // EnsureAvailable looks at the available balance in escrow for a given // address, and if less than the passed in amount, adds the difference func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) { + idAddr, err := fm.api.StateLookupID(ctx, addr, types.EmptyTSK) + if err != nil { + return cid.Undef, err + } fm.lk.Lock() - avail, ok := fm.available[addr] + avail, ok := fm.available[idAddr] if !ok { bal, err := fm.api.StateMarketBalance(ctx, addr, types.EmptyTSK) if err != nil { @@ -126,14 +131,13 @@ func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Add if toAdd.LessThan(types.NewInt(0)) { toAdd = types.NewInt(0) } - fm.available[addr] = big.Add(avail, toAdd) + fm.available[idAddr] = big.Add(avail, toAdd) fm.lk.Unlock() if toAdd.LessThanEqual(big.Zero()) { return cid.Undef, nil } - var err error params, err := actors.SerializeParams(&addr) if err != nil { return cid.Undef, err diff --git a/chain/market/fundmgr_test.go b/chain/market/fundmgr_test.go index 49db12e45..7010b3344 100644 --- a/chain/market/fundmgr_test.go +++ b/chain/market/fundmgr_test.go @@ -26,8 +26,12 @@ type fakeAPI struct { signature crypto.Signature receivedMessage *types.Message pushMessageErr error + lookupIDErr error } +func (fapi *fakeAPI) StateLookupID(_ context.Context, addr address.Address, _ types.TipSetKey) (address.Address, error) { + return addr, fapi.lookupIDErr +} func (fapi *fakeAPI) StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) { return fapi.returnedBalance, fapi.returnedBalanceErr } @@ -65,6 +69,7 @@ func TestAddFunds(t *testing.T) { addAmounts []abi.TokenAmount pushMessageErr error expectedResults []expectedResult + lookupIDErr error }{ "succeeds, trivial case": { returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(0), Locked: abi.NewTokenAmount(0)}, @@ -130,6 +135,15 @@ func TestAddFunds(t *testing.T) { }, }, }, + "error looking up address": { + lookupIDErr: errors.New("something went wrong"), + addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)}, + expectedResults: []expectedResult{ + { + err: errors.New("something went wrong"), + }, + }, + }, } for testCase, data := range testCases { @@ -147,6 +161,7 @@ func TestAddFunds(t *testing.T) { Data: sig, }, pushMessageErr: data.pushMessageErr, + lookupIDErr: data.lookupIDErr, } fundMgr := newFundMgr(fapi) addr := tutils.NewIDAddr(t, uint64(rand.Uint32()))