From b4d1b628be187399ced7b2d579c70f3ab3fbca7f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 4 Aug 2020 17:29:31 -0700 Subject: [PATCH] feat(fundmgr): add monitoring to FundMgr --- chain/market/fundmgr.go | 147 +++++++++++++++++++++++++++++----------- node/builder.go | 5 +- 2 files changed, 111 insertions(+), 41 deletions(-) diff --git a/chain/market/fundmgr.go b/chain/market/fundmgr.go index e390c530f..325707d79 100644 --- a/chain/market/fundmgr.go +++ b/chain/market/fundmgr.go @@ -2,44 +2,118 @@ package market import ( "context" - "github.com/filecoin-project/specs-actors/actors/abi/big" "sync" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "go.uber.org/fx" + "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/impl/full" ) var log = logging.Logger("market_adapter") -type FundMgr struct { - sm *stmgr.StateManager - mpool full.MpoolAPI +// API is the dependencies need to run a fund manager +type API struct { + fx.In - lk sync.Mutex + full.ChainAPI + full.StateAPI + full.MpoolAPI +} + +// FundMgr monitors available balances and adds funds when EnsureAvailable is called +type FundMgr struct { + api fundMgrAPI + + lk sync.RWMutex available map[address.Address]types.BigInt } -func NewFundMgr(sm *stmgr.StateManager, mpool full.MpoolAPI) *FundMgr { - return &FundMgr{ - sm: sm, - mpool: mpool, +// StartFundManager creates a new fund manager and sets up event hooks to manage state changes +func StartFundManager(lc fx.Lifecycle, api API) *FundMgr { + fm := newFundMgr(&api) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + ev := events.NewEvents(ctx, &api) + preds := state.NewStatePredicates(&api) + dealDiffFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(fm.getAddresses))) + match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { + return dealDiffFn(ctx, oldTs.Key(), newTs.Key()) + } + return ev.StateChanged(fm.checkFunc, fm.stateChanged, fm.revert, int(build.MessageConfidence+1), events.NoTimeout, match) + }, + }) + return fm +} +type fundMgrAPI interface { + StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) + MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) +} + +func newFundMgr(api fundMgrAPI) *FundMgr { + return &FundMgr{ + api: api, available: map[address.Address]types.BigInt{}, } } +// checkFunc tells the events api to simply proceed (we always want to watch) +func (fm *FundMgr) checkFunc(ts *types.TipSet) (done bool, more bool, err error) { + return false, true, nil +} + +// revert handles reverts to balances +func (fm *FundMgr) revert(ctx context.Context, ts *types.TipSet) error { + // TODO: Is it ok to just ignore this? + log.Warn("balance change reverted; TODO: actually handle this!") + return nil +} + +// stateChanged handles balance changes monitored on the chain from one tipset to the next +func (fm *FundMgr) stateChanged(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { + changedBalances, ok := states.(state.ChangedBalances) + if !ok { + panic("Expected state.ChangedBalances") + } + // overwrite our in memory cache with new values from chain (chain is canonical) + fm.lk.Lock() + for addr, balanceChange := range changedBalances { + fm.available[addr] = balanceChange.To + } + fm.lk.Unlock() + return true, nil +} + +func (fm *FundMgr) getAddresses() []address.Address { + fm.lk.RLock() + defer fm.lk.RUnlock() + addrs := make([]address.Address, 0, len(fm.available)) + for addr := range fm.available { + addrs = append(addrs, addr) + } + return addrs +} + +// EnsureAvailable looks at the available balance in escrow for a given +// address, and if less than the passed in amount, adds the difference func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) { fm.lk.Lock() avail, ok := fm.available[addr] if !ok { - bal, err := fm.sm.MarketBalance(ctx, addr, nil) + bal, err := fm.api.StateMarketBalance(ctx, addr, types.EmptyTSK) if err != nil { fm.lk.Unlock() return cid.Undef, err @@ -48,38 +122,33 @@ func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Add avail = types.BigSub(bal.Escrow, bal.Locked) } - toAdd := types.NewInt(0) - avail = types.BigSub(avail, amt) - if avail.LessThan(types.NewInt(0)) { - // TODO: some rules around adding more to avoid doing stuff on-chain - // all the time - toAdd = avail.Neg() - avail = types.NewInt(0) + toAdd := types.BigSub(amt, avail) + if toAdd.LessThan(types.NewInt(0)) { + toAdd = types.NewInt(0) } - fm.available[addr] = avail - + fm.available[addr] = big.Add(avail, toAdd) fm.lk.Unlock() if toAdd.LessThanEqual(big.Zero()) { return cid.Undef, nil - } else { - var err error - params, err := actors.SerializeParams(&addr) - if err != nil { - return cid.Undef, err - } - - smsg, err := fm.mpool.MpoolPushMessage(ctx, &types.Message{ - To: builtin.StorageMarketActorAddr, - From: wallet, - Value: toAdd, - Method: builtin.MethodsMarket.AddBalance, - Params: params, - }) - if err != nil { - return cid.Undef, err - } - - return smsg.Cid(), nil } + + var err error + params, err := actors.SerializeParams(&addr) + if err != nil { + return cid.Undef, err + } + + smsg, err := fm.api.MpoolPushMessage(ctx, &types.Message{ + To: builtin.StorageMarketActorAddr, + From: wallet, + Value: toAdd, + Method: builtin.MethodsMarket.AddBalance, + Params: params, + }) + if err != nil { + return cid.Undef, err + } + + return smsg.Cid(), nil } diff --git a/node/builder.go b/node/builder.go index 171c4a96e..78d4d0d4a 100644 --- a/node/builder.go +++ b/node/builder.go @@ -3,9 +3,10 @@ package node import ( "context" "errors" - "github.com/filecoin-project/lotus/markets/dealfilter" "time" + "github.com/filecoin-project/lotus/markets/dealfilter" + logging "github.com/ipfs/go-log" ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -274,7 +275,7 @@ func Online() Option { Override(new(*paychmgr.Store), paychmgr.NewStore), Override(new(*paychmgr.Manager), paychmgr.NewManager), - Override(new(*market.FundMgr), market.NewFundMgr), + Override(new(*market.FundMgr), market.StartFundManager), Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels), ),