feat(fundmgr): add monitoring to FundMgr

This commit is contained in:
hannahhoward 2020-08-04 17:29:31 -07:00
parent f4720ddb2c
commit b4d1b628be
2 changed files with 111 additions and 41 deletions

View File

@ -2,44 +2,118 @@ package market
import ( import (
"context" "context"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"sync" "sync"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"go.uber.org/fx"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/full"
) )
var log = logging.Logger("market_adapter") var log = logging.Logger("market_adapter")
type FundMgr struct { // API is the dependencies need to run a fund manager
sm *stmgr.StateManager type API struct {
mpool full.MpoolAPI fx.In
lk sync.Mutex full.ChainAPI
full.StateAPI
full.MpoolAPI
}
// FundMgr monitors available balances and adds funds when EnsureAvailable is called
type FundMgr struct {
api fundMgrAPI
lk sync.RWMutex
available map[address.Address]types.BigInt available map[address.Address]types.BigInt
} }
func NewFundMgr(sm *stmgr.StateManager, mpool full.MpoolAPI) *FundMgr { // StartFundManager creates a new fund manager and sets up event hooks to manage state changes
return &FundMgr{ func StartFundManager(lc fx.Lifecycle, api API) *FundMgr {
sm: sm, fm := newFundMgr(&api)
mpool: mpool, lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
ev := events.NewEvents(ctx, &api)
preds := state.NewStatePredicates(&api)
dealDiffFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(fm.getAddresses)))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiffFn(ctx, oldTs.Key(), newTs.Key())
}
return ev.StateChanged(fm.checkFunc, fm.stateChanged, fm.revert, int(build.MessageConfidence+1), events.NoTimeout, match)
},
})
return fm
}
type fundMgrAPI interface {
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
}
func newFundMgr(api fundMgrAPI) *FundMgr {
return &FundMgr{
api: api,
available: map[address.Address]types.BigInt{}, available: map[address.Address]types.BigInt{},
} }
} }
// checkFunc tells the events api to simply proceed (we always want to watch)
func (fm *FundMgr) checkFunc(ts *types.TipSet) (done bool, more bool, err error) {
return false, true, nil
}
// revert handles reverts to balances
func (fm *FundMgr) revert(ctx context.Context, ts *types.TipSet) error {
// TODO: Is it ok to just ignore this?
log.Warn("balance change reverted; TODO: actually handle this!")
return nil
}
// stateChanged handles balance changes monitored on the chain from one tipset to the next
func (fm *FundMgr) stateChanged(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
changedBalances, ok := states.(state.ChangedBalances)
if !ok {
panic("Expected state.ChangedBalances")
}
// overwrite our in memory cache with new values from chain (chain is canonical)
fm.lk.Lock()
for addr, balanceChange := range changedBalances {
fm.available[addr] = balanceChange.To
}
fm.lk.Unlock()
return true, nil
}
func (fm *FundMgr) getAddresses() []address.Address {
fm.lk.RLock()
defer fm.lk.RUnlock()
addrs := make([]address.Address, 0, len(fm.available))
for addr := range fm.available {
addrs = append(addrs, addr)
}
return addrs
}
// EnsureAvailable looks at the available balance in escrow for a given
// address, and if less than the passed in amount, adds the difference
func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) { func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) {
fm.lk.Lock() fm.lk.Lock()
avail, ok := fm.available[addr] avail, ok := fm.available[addr]
if !ok { if !ok {
bal, err := fm.sm.MarketBalance(ctx, addr, nil) bal, err := fm.api.StateMarketBalance(ctx, addr, types.EmptyTSK)
if err != nil { if err != nil {
fm.lk.Unlock() fm.lk.Unlock()
return cid.Undef, err return cid.Undef, err
@ -48,28 +122,24 @@ func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Add
avail = types.BigSub(bal.Escrow, bal.Locked) avail = types.BigSub(bal.Escrow, bal.Locked)
} }
toAdd := types.NewInt(0) toAdd := types.BigSub(amt, avail)
avail = types.BigSub(avail, amt) if toAdd.LessThan(types.NewInt(0)) {
if avail.LessThan(types.NewInt(0)) { toAdd = types.NewInt(0)
// TODO: some rules around adding more to avoid doing stuff on-chain
// all the time
toAdd = avail.Neg()
avail = types.NewInt(0)
} }
fm.available[addr] = avail fm.available[addr] = big.Add(avail, toAdd)
fm.lk.Unlock() fm.lk.Unlock()
if toAdd.LessThanEqual(big.Zero()) { if toAdd.LessThanEqual(big.Zero()) {
return cid.Undef, nil return cid.Undef, nil
} else { }
var err error var err error
params, err := actors.SerializeParams(&addr) params, err := actors.SerializeParams(&addr)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }
smsg, err := fm.mpool.MpoolPushMessage(ctx, &types.Message{ smsg, err := fm.api.MpoolPushMessage(ctx, &types.Message{
To: builtin.StorageMarketActorAddr, To: builtin.StorageMarketActorAddr,
From: wallet, From: wallet,
Value: toAdd, Value: toAdd,
@ -82,4 +152,3 @@ func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Add
return smsg.Cid(), nil return smsg.Cid(), nil
} }
}

View File

@ -3,9 +3,10 @@ package node
import ( import (
"context" "context"
"errors" "errors"
"github.com/filecoin-project/lotus/markets/dealfilter"
"time" "time"
"github.com/filecoin-project/lotus/markets/dealfilter"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
ci "github.com/libp2p/go-libp2p-core/crypto" ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -274,7 +275,7 @@ func Online() Option {
Override(new(*paychmgr.Store), paychmgr.NewStore), Override(new(*paychmgr.Store), paychmgr.NewStore),
Override(new(*paychmgr.Manager), paychmgr.NewManager), Override(new(*paychmgr.Manager), paychmgr.NewManager),
Override(new(*market.FundMgr), market.NewFundMgr), Override(new(*market.FundMgr), market.StartFundManager),
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels), Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
), ),