From 4d3cd7dcb82af2b3cf55fffd63fa7c4e1555576c Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 5 Nov 2020 17:50:40 +0100 Subject: [PATCH] refactor: FundManager --- chain/market/cbor_gen.go | 126 +++++++ chain/market/fundmanager.go | 600 +++++++++++++++++++++++++++++++ chain/market/fundmanager_test.go | 506 ++++++++++++++++++++++++++ chain/market/fundmgr.go | 3 - chain/market/store.go | 73 ++++ gen/main.go | 10 + 6 files changed, 1315 insertions(+), 3 deletions(-) create mode 100644 chain/market/cbor_gen.go create mode 100644 chain/market/fundmanager.go create mode 100644 chain/market/fundmanager_test.go create mode 100644 chain/market/store.go diff --git a/chain/market/cbor_gen.go b/chain/market/cbor_gen.go new file mode 100644 index 000000000..85f65121b --- /dev/null +++ b/chain/market/cbor_gen.go @@ -0,0 +1,126 @@ +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +package market + +import ( + "fmt" + "io" + + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +var _ = xerrors.Errorf + +var lengthBufFundedAddressState = []byte{132} + +func (t *FundedAddressState) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write(lengthBufFundedAddressState); err != nil { + return err + } + + scratch := make([]byte, 9) + + // t.Wallet (address.Address) (struct) + if err := t.Wallet.MarshalCBOR(w); err != nil { + return err + } + + // t.Addr (address.Address) (struct) + if err := t.Addr.MarshalCBOR(w); err != nil { + return err + } + + // t.AmtReserved (big.Int) (struct) + if err := t.AmtReserved.MarshalCBOR(w); err != nil { + return err + } + + // t.MsgCid (cid.Cid) (struct) + + if t.MsgCid == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCidBuf(scratch, w, *t.MsgCid); err != nil { + return xerrors.Errorf("failed to write cid field t.MsgCid: %w", err) + } + } + + return nil +} + +func (t *FundedAddressState) UnmarshalCBOR(r io.Reader) error { + *t = FundedAddressState{} + + br := cbg.GetPeeker(r) + scratch := make([]byte, 8) + + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 4 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Wallet (address.Address) (struct) + + { + + if err := t.Wallet.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Wallet: %w", err) + } + + } + // t.Addr (address.Address) (struct) + + { + + if err := t.Addr.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Addr: %w", err) + } + + } + // t.AmtReserved (big.Int) (struct) + + { + + if err := t.AmtReserved.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.AmtReserved: %w", err) + } + + } + // t.MsgCid (cid.Cid) (struct) + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.MsgCid: %w", err) + } + + t.MsgCid = &c + } + + } + return nil +} diff --git a/chain/market/fundmanager.go b/chain/market/fundmanager.go new file mode 100644 index 000000000..1834abf31 --- /dev/null +++ b/chain/market/fundmanager.go @@ -0,0 +1,600 @@ +package market + +import ( + "context" + "sync" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + + "github.com/filecoin-project/lotus/build" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/impl/full" + "go.uber.org/fx" +) + +var log = logging.Logger("market_adapter") + +// API is the fx dependencies need to run a fund manager +type FundManagerAPI struct { + fx.In + + full.StateAPI + full.MpoolAPI +} + +// fundManagerAPI is the specific methods called by the FundManager +type fundManagerAPI interface { + MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) + StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) + StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) +} + +// FundManager keeps track of funds in a set of addresses +type FundManager struct { + ctx context.Context + shutdown context.CancelFunc + api fundManagerAPI + wallet address.Address + str *Store + + lk sync.Mutex + fundedAddrs map[address.Address]*fundedAddress +} + +type waitSentinel cid.Cid + +var waitSentinelUndef = waitSentinel(cid.Undef) + +func NewFundManager(api fundManagerAPI, ds datastore.Batching, wallet address.Address) *FundManager { + ctx, cancel := context.WithCancel(context.Background()) + return &FundManager{ + ctx: ctx, + shutdown: cancel, + api: api, + wallet: wallet, + str: newStore(ds), + fundedAddrs: make(map[address.Address]*fundedAddress), + } +} + +func (fm *FundManager) Stop() { + fm.shutdown() +} + +func (fm *FundManager) Start() error { + fm.lk.Lock() + defer fm.lk.Unlock() + + // TODO: + // To save memory: + // - in State() only load addresses with in-progress messages + // - load the others just-in-time from getFundedAddress + // - delete(fm.fundedAddrs, addr) when the queue has been processed + return fm.str.forEach(func(state *FundedAddressState) { + fa := newFundedAddress(fm, state.Addr) + fa.state = state + fm.fundedAddrs[fa.state.Addr] = fa + fa.start() + }) +} + +// Creates a fundedAddress if it doesn't already exist, and returns it +func (fm *FundManager) getFundedAddress(addr address.Address) *fundedAddress { + fm.lk.Lock() + defer fm.lk.Unlock() + + fa, ok := fm.fundedAddrs[addr] + if !ok { + fa = newFundedAddress(fm, addr) + fm.fundedAddrs[addr] = fa + } + return fa +} + +// Reserve adds amt to `reserved`. If there is not enough available funds for +// the address, submits a message on chain to top up available funds. +func (fm *FundManager) Reserve(ctx context.Context, addr address.Address, amt abi.TokenAmount) (waitSentinel, error) { + return fm.getFundedAddress(addr).reserve(ctx, amt) +} + +// Subtract from `reserved`. +func (fm *FundManager) Release(ctx context.Context, addr address.Address, amt abi.TokenAmount) error { + return fm.getFundedAddress(addr).release(ctx, amt) +} + +// Withdraw unreserved funds. Only succeeds if there are enough unreserved +// funds for the address. +func (fm *FundManager) Withdraw(ctx context.Context, addr address.Address, amt abi.TokenAmount) (waitSentinel, error) { + return fm.getFundedAddress(addr).withdraw(ctx, amt) +} + +// Waits for a reserve or withdraw to complete. +func (fm *FundManager) Wait(ctx context.Context, sentinel waitSentinel) error { + _, err := fm.api.StateWaitMsg(ctx, cid.Cid(sentinel), build.MessageConfidence) + return err +} + +// FundedAddressState keeps track of the state of an address with funds in the +// datastore +type FundedAddressState struct { + // Wallet is the wallet from which funds are added to the address + Wallet address.Address + Addr address.Address + // AmtReserved is the amount that must be kept in the address (cannot be + // withdrawn) + AmtReserved abi.TokenAmount + // MsgCid is the cid of an in-progress on-chain message + MsgCid *cid.Cid +} + +// fundedAddress keeps track of the state and request queues for a +// particular address +type fundedAddress struct { + ctx context.Context + env *fundManagerEnvironment + str *Store + + lk sync.Mutex + state *FundedAddressState + + // Note: These request queues are ephemeral, they are not saved to store + reservations []*fundRequest + releases []*fundRequest + withdrawals []*fundRequest + + // Used by the tests + onProcessStartListener func() bool +} + +func newFundedAddress(fm *FundManager, addr address.Address) *fundedAddress { + return &fundedAddress{ + ctx: fm.ctx, + env: &fundManagerEnvironment{api: fm.api}, + str: fm.str, + state: &FundedAddressState{ + Wallet: fm.wallet, + Addr: addr, + AmtReserved: abi.NewTokenAmount(0), + }, + } +} + +// If there is a in-progress on-chain message, don't submit any more messages +// on chain until it completes +func (a *fundedAddress) start() { + a.lk.Lock() + defer a.lk.Unlock() + + if a.state.MsgCid != nil { + a.debugf("restart: wait for %s", a.state.MsgCid) + a.startWaitForResults(*a.state.MsgCid) + } +} + +func (a *fundedAddress) reserve(ctx context.Context, amt abi.TokenAmount) (waitSentinel, error) { + return a.requestAndWait(ctx, amt, &a.reservations) +} + +func (a *fundedAddress) release(ctx context.Context, amt abi.TokenAmount) error { + _, err := a.requestAndWait(ctx, amt, &a.releases) + return err +} + +func (a *fundedAddress) withdraw(ctx context.Context, amt abi.TokenAmount) (waitSentinel, error) { + return a.requestAndWait(ctx, amt, &a.withdrawals) +} + +func (a *fundedAddress) requestAndWait(ctx context.Context, amt abi.TokenAmount, reqs *[]*fundRequest) (waitSentinel, error) { + // Create a request and add it to the request queue + req := newFundRequest(ctx, amt) + + a.lk.Lock() + *reqs = append(*reqs, req) + a.lk.Unlock() + + // Process the queue + go a.process() + + // Wait for the results + select { + case <-ctx.Done(): + return waitSentinelUndef, ctx.Err() + case r := <-req.Result: + return waitSentinel(r.msgCid), r.err + } +} + +// Used by the tests +func (a *fundedAddress) onProcessStart(fn func() bool) { + a.lk.Lock() + defer a.lk.Unlock() + + a.onProcessStartListener = fn +} + +// Process queued requests +func (a *fundedAddress) process() { + a.lk.Lock() + defer a.lk.Unlock() + + // Used by the tests + if a.onProcessStartListener != nil { + done := a.onProcessStartListener() + if done { + a.onProcessStartListener = nil + } + } + + // Check if we're still waiting for the response to a message + if a.state.MsgCid != nil { + return + } + + // Check if there's anything to do + if len(a.reservations) == 0 && len(a.releases) == 0 && len(a.withdrawals) == 0 { + return + } + + res, _ := a.processRequests() + + a.reservations = filterOutProcessedReqs(a.reservations) + a.releases = filterOutProcessedReqs(a.releases) + a.withdrawals = filterOutProcessedReqs(a.withdrawals) + + a.applyStateChange(res) +} + +// Filter out completed requests +func filterOutProcessedReqs(reqs []*fundRequest) []*fundRequest { + filtered := make([]*fundRequest, 0, len(reqs)) + for _, req := range reqs { + if !req.Completed() { + filtered = append(filtered, req) + } + } + return filtered +} + +// Apply the results of processing queues and save to the datastore +func (a *fundedAddress) applyStateChange(res *processResult) { + a.state.MsgCid = res.msgCid + a.state.AmtReserved = res.amtReserved + a.saveState() +} + +// Clear the pending message cid so that a new message can be sent +func (a *fundedAddress) clearWaitState() { + a.state.MsgCid = nil + a.saveState() +} + +// Save state to datastore +func (a *fundedAddress) saveState() { + // Not much we can do if saving to the datastore fails, just log + err := a.str.save(a.state) + if err != nil { + log.Errorf("saving state to store for addr %s: %w", a.state.Addr, err) + } +} + +// The result of processing the request queues +type processResult struct { + // The new reserved amount + amtReserved abi.TokenAmount + // The message cid, if a message was pushed + msgCid *cid.Cid +} + +// process request queues and return the resulting changes to state +func (a *fundedAddress) processRequests() (pr *processResult, prerr error) { + // If there's an error, mark reserve requests as errored + defer func() { + if prerr != nil { + for _, req := range a.reservations { + req.Complete(cid.Undef, prerr) + } + } + }() + + // Start with the reserved amount in state + reserved := a.state.AmtReserved + + // Add the amount of each reserve request + for _, req := range a.reservations { + amt := req.Amount() + a.debugf("reserve %d", amt) + reserved = types.BigAdd(reserved, amt) + } + + // Subtract the amount of each release request + for _, req := range a.releases { + amt := req.Amount() + a.debugf("release %d", amt) + reserved = types.BigSub(reserved, amt) + + // Mark release as complete + req.Complete(cid.Undef, nil) + } + + // If reserved amount is negative, set it to zero + if reserved.LessThan(abi.NewTokenAmount(0)) { + reserved = abi.NewTokenAmount(0) + } + + res := &processResult{amtReserved: reserved} + + // Work out the amount to add to the balance + toAdd := abi.NewTokenAmount(0) + + // If the new reserved amount is greater than the existing amount + if reserved.GreaterThan(a.state.AmtReserved) { + a.debugf("reserved %d > state.AmtReserved %d", reserved, a.state.AmtReserved) + + // Get available funds for address + avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr) + if err != nil { + return res, err + } + + // amount to add = new reserved amount - available + toAdd = types.BigSub(reserved, avail) + a.debugf("reserved %d - avail %d = %d", reserved, avail, toAdd) + } + + // If there's nothing to add to the balance + if toAdd.LessThanEqual(abi.NewTokenAmount(0)) { + // Mark reserve requests as complete + for _, req := range a.reservations { + req.Complete(cid.Undef, nil) + } + + // Process withdrawals + return a.processWithdrawals(reserved) + } + + // Add funds to address + a.debugf("add funds %d", toAdd) + addFundsCid, err := a.env.AddFunds(a.ctx, a.state.Wallet, a.state.Addr, toAdd) + if err != nil { + return res, err + } + + // Mark reserve requests as complete + for _, req := range a.reservations { + req.Complete(addFundsCid, nil) + } + + // Start waiting for results (async) + defer a.startWaitForResults(addFundsCid) + + // Save the message CID to state + res.msgCid = &addFundsCid + return res, nil +} + +// process withdrawal queue +func (a *fundedAddress) processWithdrawals(reserved abi.TokenAmount) (pr *processResult, prerr error) { + // If there's an error, mark withdrawal requests as errored + defer func() { + if prerr != nil { + for _, req := range a.withdrawals { + req.Complete(cid.Undef, prerr) + } + } + }() + + res := &processResult{ + amtReserved: reserved, + } + + // Get the net available balance + avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr) + if err != nil { + return res, err + } + + netAvail := types.BigSub(avail, reserved) + + // Fit as many withdrawals as possible into the available balance, and fail + // the rest + withdrawalAmt := abi.NewTokenAmount(0) + allowedAmt := abi.NewTokenAmount(0) + allowed := make([]*fundRequest, 0, len(a.withdrawals)) + for _, req := range a.withdrawals { + amt := req.Amount() + withdrawalAmt = types.BigAdd(withdrawalAmt, amt) + if withdrawalAmt.LessThanEqual(netAvail) { + a.debugf("withdraw %d", amt) + allowed = append(allowed, req) + allowedAmt = types.BigAdd(allowedAmt, amt) + } else { + err := xerrors.Errorf("insufficient funds for withdrawal %d", amt) + a.debugf("%s", err) + req.Complete(cid.Undef, err) + } + } + + // Check if there is anything to withdraw + if allowedAmt.Equals(abi.NewTokenAmount(0)) { + // Mark allowed requests as complete + for _, req := range allowed { + req.Complete(cid.Undef, nil) + } + return res, nil + } + + // Withdraw funds + a.debugf("withdraw funds %d", allowedAmt) + withdrawFundsCid, err := a.env.WithdrawFunds(a.ctx, a.state.Wallet, a.state.Addr, allowedAmt) + if err != nil { + return res, err + } + + // Mark allowed requests as complete + for _, req := range allowed { + req.Complete(withdrawFundsCid, nil) + } + + // Start waiting for results of message (async) + defer a.startWaitForResults(withdrawFundsCid) + + // Save the message CID to state + res.msgCid = &withdrawFundsCid + return res, nil +} + +// asynchonously wait for results of message +func (a *fundedAddress) startWaitForResults(msgCid cid.Cid) { + go func() { + err := a.env.WaitMsg(a.ctx, msgCid) + if err != nil { + // We don't really care about the results here, we're just waiting + // so as to only process one on-chain message at a time + log.Errorf("waiting for results of message %s for addr %s: %w", msgCid, a.state.Addr, err) + } + + a.lk.Lock() + a.debugf("complete wait") + a.clearWaitState() + a.lk.Unlock() + + a.process() + }() +} + +func (a *fundedAddress) debugf(args ...interface{}) { + fmtStr := args[0].(string) + args = args[1:] + log.Debugf(a.state.Addr.String()+": "+fmtStr, args...) +} + +// The result of a fund request +type reqResult struct { + msgCid cid.Cid + err error +} + +// A request to change funds +type fundRequest struct { + ctx context.Context + amt abi.TokenAmount + completed chan struct{} + Result chan reqResult +} + +func newFundRequest(ctx context.Context, amt abi.TokenAmount) *fundRequest { + return &fundRequest{ + ctx: ctx, + amt: amt, + Result: make(chan reqResult), + completed: make(chan struct{}), + } +} + +// Amount returns zero if the context has expired +func (frp *fundRequest) Amount() abi.TokenAmount { + if frp.ctx.Err() != nil { + return abi.NewTokenAmount(0) + } + return frp.amt +} + +// Complete is called with the message CID when the funds request has been +// started or with the error if there was an error +func (frp *fundRequest) Complete(msgCid cid.Cid, err error) { + select { + case <-frp.completed: + case <-frp.ctx.Done(): + case frp.Result <- reqResult{msgCid: msgCid, err: err}: + } + close(frp.completed) +} + +// Completed indicates if Complete has already been called +func (frp *fundRequest) Completed() bool { + select { + case <-frp.completed: + return true + default: + return false + } +} + +func (frp *fundRequest) Equals(other *fundRequest) bool { + return frp == other +} + +// fundManagerEnvironment simplifies some API calls +type fundManagerEnvironment struct { + api fundManagerAPI +} + +func (env *fundManagerEnvironment) AvailableFunds(ctx context.Context, addr address.Address) (abi.TokenAmount, error) { + bal, err := env.api.StateMarketBalance(ctx, addr, types.EmptyTSK) + if err != nil { + return abi.NewTokenAmount(0), err + } + + return types.BigSub(bal.Escrow, bal.Locked), nil +} + +func (env *fundManagerEnvironment) AddFunds( + ctx context.Context, + wallet address.Address, + addr address.Address, + amt abi.TokenAmount, +) (cid.Cid, error) { + return env.sendFunds(ctx, wallet, addr, amt) +} + +func (env *fundManagerEnvironment) WithdrawFunds( + ctx context.Context, + wallet address.Address, + addr address.Address, + amt abi.TokenAmount, +) (cid.Cid, error) { + return env.sendFunds(ctx, addr, wallet, amt) +} + +func (env *fundManagerEnvironment) sendFunds( + ctx context.Context, + from address.Address, + to address.Address, + amt abi.TokenAmount, +) (cid.Cid, error) { + params, err := actors.SerializeParams(&to) + if err != nil { + return cid.Undef, err + } + + smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{ + To: market.Address, + From: from, + Value: amt, + Method: market.Methods.AddBalance, + Params: params, + }, nil) + + if aerr != nil { + return cid.Undef, aerr + } + + return smsg.Cid(), nil +} + +func (env *fundManagerEnvironment) WaitMsg(ctx context.Context, c cid.Cid) error { + _, err := env.api.StateWaitMsg(ctx, c, build.MessageConfidence) + return err +} diff --git a/chain/market/fundmanager_test.go b/chain/market/fundmanager_test.go new file mode 100644 index 000000000..a8fabc2fc --- /dev/null +++ b/chain/market/fundmanager_test.go @@ -0,0 +1,506 @@ +package market + +import ( + "bytes" + "context" + "sync" + "testing" + "time" + + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + + "github.com/filecoin-project/go-state-types/abi" + + tutils "github.com/filecoin-project/specs-actors/v2/support/testing" + + "github.com/filecoin-project/lotus/chain/wallet" + + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" +) + +// TestFundManagerBasic verifies that the basic fund manager operations work +func TestFundManagerBasic(t *testing.T) { + s := setup(t) + defer s.fm.Stop() + + // Reserve 10 + // balance: 0 -> 10 + // reserved: 0 -> 10 + amt := abi.NewTokenAmount(10) + sentinel, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + msg := s.mockApi.getSentMessage(cid.Cid(sentinel)) + checkMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) + + s.mockApi.completeMsg(cid.Cid(sentinel)) + err = s.fm.Wait(s.ctx, sentinel) + require.NoError(t, err) + + // Reserve 7 + // balance: 10 -> 17 + // reserved: 10 -> 17 + amt = abi.NewTokenAmount(7) + sentinel, err = s.fm.Reserve(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) + checkMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) + + s.mockApi.completeMsg(cid.Cid(sentinel)) + err = s.fm.Wait(s.ctx, sentinel) + require.NoError(t, err) + + // Release 5 + // balance: 17 + // reserved: 17 -> 12 + amt = abi.NewTokenAmount(5) + err = s.fm.Release(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + // Withdraw 2 + // balance: 17 -> 15 + // reserved: 12 + amt = abi.NewTokenAmount(2) + sentinel, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) + checkMessageFields(t, msg, s.acctAddr, s.walletAddr, amt) + + s.mockApi.completeMsg(cid.Cid(sentinel)) + err = s.fm.Wait(s.ctx, sentinel) + require.NoError(t, err) + + // Reserve 3 + // balance: 15 + // reserved: 12 -> 15 + // Note: reserved (15) is <= balance (15) so should not send on-chain + // message + msgCount := s.mockApi.messageCount() + amt = abi.NewTokenAmount(3) + sentinel, err = s.fm.Reserve(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + require.Equal(t, msgCount, s.mockApi.messageCount()) + require.Equal(t, sentinel, waitSentinelUndef) + + // Reserve 1 + // balance: 15 -> 16 + // reserved: 15 -> 16 + // Note: reserved (16) is above balance (15) so *should* send on-chain + // message to top up balance + amt = abi.NewTokenAmount(1) + topUp := abi.NewTokenAmount(1) + sentinel, err = s.fm.Reserve(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + s.mockApi.completeMsg(cid.Cid(sentinel)) + msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) + checkMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp) + + // Withdraw 1 + // balance: 16 + // reserved: 16 + // Note: Expect failure because there is no available balance to withdraw: + // balance - reserved = 16 - 16 = 0 + amt = abi.NewTokenAmount(1) + sentinel, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt) + require.Error(t, err) +} + +// TestFundManagerParallel verifies that operations can be run in parallel +func TestFundManagerParallel(t *testing.T) { + s := setup(t) + defer s.fm.Stop() + + // Reserve 10 + amt := abi.NewTokenAmount(10) + sentinelReserve10, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + // Wait until all the subsequent requests are queued up + queueReady := make(chan struct{}) + fa := s.fm.getFundedAddress(s.acctAddr) + fa.onProcessStart(func() bool { + if len(fa.withdrawals) == 1 && len(fa.reservations) == 2 && len(fa.releases) == 1 { + close(queueReady) + return true + } + return false + }) + + // Withdraw 5 (should not run until after reserves / releases) + withdrawReady := make(chan error) + go func() { + amt = abi.NewTokenAmount(5) + _, err := s.fm.Withdraw(s.ctx, s.acctAddr, amt) + withdrawReady <- err + }() + + reserveSentinels := make(chan waitSentinel) + + // Reserve 3 + go func() { + amt := abi.NewTokenAmount(3) + sentinelReserve3, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + reserveSentinels <- sentinelReserve3 + }() + + // Reserve 5 + go func() { + amt := abi.NewTokenAmount(5) + sentinelReserve5, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + reserveSentinels <- sentinelReserve5 + }() + + // Release 2 + go func() { + amt := abi.NewTokenAmount(2) + err = s.fm.Release(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + }() + + // Everything is queued up + <-queueReady + + // Complete the "Reserve 10" message + s.mockApi.completeMsg(cid.Cid(sentinelReserve10)) + msg := s.mockApi.getSentMessage(cid.Cid(sentinelReserve10)) + checkMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10)) + + // The other requests should now be combined and be submitted on-chain as + // a single message + rs1 := <-reserveSentinels + rs2 := <-reserveSentinels + require.Equal(t, rs1, rs2) + + // Withdraw should not have been called yet, because reserve / release + // requests run first + select { + case <-withdrawReady: + require.Fail(t, "Withdraw should run after reserve / release") + default: + } + + // Complete the message + s.mockApi.completeMsg(cid.Cid(rs1)) + msg = s.mockApi.getSentMessage(cid.Cid(rs1)) + + // "Reserve 3" +3 + // "Reserve 5" +5 + // "Release 2" -2 + // Result: 6 + checkMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(6)) + + // Expect withdraw to fail because not enough available funds + err = <-withdrawReady + require.Error(t, err) +} + +// TestFundManagerWithdrawal verifies that as many withdraw operations as +// possible are processed +func TestFundManagerWithdrawal(t *testing.T) { + s := setup(t) + defer s.fm.Stop() + + // Reserve 10 + amt := abi.NewTokenAmount(10) + sentinelReserve10, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + // Complete the "Reserve 10" message + s.mockApi.completeMsg(cid.Cid(sentinelReserve10)) + + // Release 10 + err = s.fm.Release(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + // Available 10 + // Withdraw 6 + // Expect success + amt = abi.NewTokenAmount(6) + sentinelWithdraw, err := s.fm.Withdraw(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + s.mockApi.completeMsg(cid.Cid(sentinelWithdraw)) + err = s.fm.Wait(s.ctx, sentinelWithdraw) + require.NoError(t, err) + + // Available 4 + // Withdraw 4 + // Expect success + amt = abi.NewTokenAmount(4) + sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + s.mockApi.completeMsg(cid.Cid(sentinelWithdraw)) + err = s.fm.Wait(s.ctx, sentinelWithdraw) + require.NoError(t, err) + + // Available 0 + // Withdraw 1 + // Expect FAIL + amt = abi.NewTokenAmount(1) + sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt) + require.Error(t, err) +} + +// TestFundManagerRestart verifies that waiting for incomplete requests resumes +// on restart +func TestFundManagerRestart(t *testing.T) { + s := setup(t) + defer s.fm.Stop() + + acctAddr2 := tutils.NewActorAddr(t, "addr2") + + // Address 1: Reserve 10 + amt := abi.NewTokenAmount(10) + sentinelAddr1, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + require.NoError(t, err) + + msg := s.mockApi.getSentMessage(cid.Cid(sentinelAddr1)) + checkMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) + + // Address 2: Reserve 7 + amt2 := abi.NewTokenAmount(7) + sentinelAddr2Res7, err := s.fm.Reserve(s.ctx, acctAddr2, amt2) + require.NoError(t, err) + + msg2 := s.mockApi.getSentMessage(cid.Cid(sentinelAddr2Res7)) + checkMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2) + + // Complete "Address 1: Reserve 10" + s.mockApi.completeMsg(cid.Cid(sentinelAddr1)) + err = s.fm.Wait(s.ctx, sentinelAddr1) + require.NoError(t, err) + + // Give the completed state a moment to be stored before restart + time.Sleep(time.Millisecond * 10) + + // Restart + mockApiAfter := s.mockApi + fmAfter := NewFundManager(mockApiAfter, s.ds, s.walletAddr) + fmAfter.Start() + + amt3 := abi.NewTokenAmount(9) + reserveSentinel := make(chan waitSentinel) + go func() { + // Address 2: Reserve 9 + sentinel3, err := fmAfter.Reserve(s.ctx, acctAddr2, amt3) + require.NoError(t, err) + reserveSentinel <- sentinel3 + }() + + // Expect no message to be sent, because still waiting for previous + // message "Address 2: Reserve 7" to complete on-chain + select { + case <-reserveSentinel: + require.Fail(t, "Expected no message to be sent") + case <-time.After(10 * time.Millisecond): + } + + // Complete "Address 2: Reserve 7" + mockApiAfter.completeMsg(cid.Cid(sentinelAddr2Res7)) + err = fmAfter.Wait(s.ctx, sentinelAddr2Res7) + require.NoError(t, err) + + // Expect waiting message to now be sent + sentinel3 := <-reserveSentinel + msg3 := mockApiAfter.getSentMessage(cid.Cid(sentinel3)) + checkMessageFields(t, msg3, s.walletAddr, acctAddr2, amt3) +} + +type scaffold struct { + ctx context.Context + ds *ds_sync.MutexDatastore + walletAddr address.Address + acctAddr address.Address + mockApi *mockFundManagerAPI + fm *FundManager +} + +func setup(t *testing.T) *scaffold { + ctx := context.Background() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + walletAddr, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + acctAddr := tutils.NewActorAddr(t, "addr") + + mockApi := newMockFundManagerAPI(walletAddr) + ds := ds_sync.MutexWrap(ds.NewMapDatastore()) + fm := NewFundManager(mockApi, ds, walletAddr) + return &scaffold{ + ctx: ctx, + ds: ds, + walletAddr: walletAddr, + acctAddr: acctAddr, + mockApi: mockApi, + fm: fm, + } +} + +func checkMessageFields(t *testing.T, msg *types.Message, from address.Address, to address.Address, amt abi.TokenAmount) { + require.Equal(t, from, msg.From) + require.Equal(t, market.Address, msg.To) + require.Equal(t, amt, msg.Value) + + var paramsTo address.Address + err := paramsTo.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + require.Equal(t, to, paramsTo) +} + +type sentMsg struct { + msg *types.SignedMessage + ready chan struct{} +} + +type mockFundManagerAPI struct { + wallet address.Address + + lk sync.Mutex + escrow map[address.Address]abi.TokenAmount + sentMsgs map[cid.Cid]*sentMsg + completedMsgs map[cid.Cid]struct{} + waitingFor map[cid.Cid]chan struct{} +} + +func newMockFundManagerAPI(wallet address.Address) *mockFundManagerAPI { + return &mockFundManagerAPI{ + wallet: wallet, + escrow: make(map[address.Address]abi.TokenAmount), + sentMsgs: make(map[cid.Cid]*sentMsg), + completedMsgs: make(map[cid.Cid]struct{}), + waitingFor: make(map[cid.Cid]chan struct{}), + } +} + +func (mapi *mockFundManagerAPI) MpoolPushMessage(ctx context.Context, message *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { + mapi.lk.Lock() + defer mapi.lk.Unlock() + + smsg := &types.SignedMessage{Message: *message} + mapi.sentMsgs[smsg.Cid()] = &sentMsg{msg: smsg, ready: make(chan struct{})} + + return smsg, nil +} + +func (mapi *mockFundManagerAPI) getSentMessage(c cid.Cid) *types.Message { + mapi.lk.Lock() + defer mapi.lk.Unlock() + + for i := 0; i < 1000; i++ { + if pending, ok := mapi.sentMsgs[c]; ok { + return &pending.msg.Message + } + time.Sleep(time.Millisecond) + } + panic("expected message to be sent") +} + +func (mapi *mockFundManagerAPI) messageCount() int { + mapi.lk.Lock() + defer mapi.lk.Unlock() + + return len(mapi.sentMsgs) +} + +func (mapi *mockFundManagerAPI) completeMsg(msgCid cid.Cid) { + mapi.lk.Lock() + + pmsg, ok := mapi.sentMsgs[msgCid] + if ok { + if pmsg.msg.Message.From == mapi.wallet { + var escrowAcct address.Address + err := escrowAcct.UnmarshalCBOR(bytes.NewReader(pmsg.msg.Message.Params)) + if err != nil { + panic(err) + } + + escrow := mapi.getEscrow(escrowAcct) + before := escrow + escrow = types.BigAdd(escrow, pmsg.msg.Message.Value) + mapi.escrow[escrowAcct] = escrow + log.Debugf("%s: escrow %d -> %d", escrowAcct, before, escrow) + } else { + escrowAcct := pmsg.msg.Message.From + escrow := mapi.getEscrow(escrowAcct) + before := escrow + escrow = types.BigSub(escrow, pmsg.msg.Message.Value) + mapi.escrow[escrowAcct] = escrow + log.Debugf("%s: escrow %d -> %d", escrowAcct, before, escrow) + } + } + + mapi.completedMsgs[msgCid] = struct{}{} + + ready, ok := mapi.waitingFor[msgCid] + + mapi.lk.Unlock() + + if ok { + close(ready) + } +} + +func (mapi *mockFundManagerAPI) StateMarketBalance(ctx context.Context, a address.Address, key types.TipSetKey) (api.MarketBalance, error) { + mapi.lk.Lock() + defer mapi.lk.Unlock() + + return api.MarketBalance{ + Locked: abi.NewTokenAmount(0), + Escrow: mapi.getEscrow(a), + }, nil +} + +func (mapi *mockFundManagerAPI) getEscrow(a address.Address) abi.TokenAmount { + escrow := mapi.escrow[a] + if escrow.Nil() { + return abi.NewTokenAmount(0) + } + return escrow +} + +func (mapi *mockFundManagerAPI) StateWaitMsg(ctx context.Context, c cid.Cid, confidence uint64) (*api.MsgLookup, error) { + res := &api.MsgLookup{ + Message: c, + Receipt: types.MessageReceipt{ + ExitCode: 0, + Return: nil, + }, + } + ready := make(chan struct{}) + + mapi.lk.Lock() + _, ok := mapi.completedMsgs[c] + if !ok { + mapi.waitingFor[c] = ready + } + mapi.lk.Unlock() + + if !ok { + select { + case <-ctx.Done(): + case <-ready: + } + } + return res, nil +} diff --git a/chain/market/fundmgr.go b/chain/market/fundmgr.go index 50467a6e1..a09b5b96e 100644 --- a/chain/market/fundmgr.go +++ b/chain/market/fundmgr.go @@ -8,7 +8,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" "go.uber.org/fx" "github.com/filecoin-project/lotus/api" @@ -20,8 +19,6 @@ import ( "github.com/filecoin-project/lotus/node/impl/full" ) -var log = logging.Logger("market_adapter") - // API is the dependencies need to run a fund manager type API struct { fx.In diff --git a/chain/market/store.go b/chain/market/store.go new file mode 100644 index 000000000..328d0c089 --- /dev/null +++ b/chain/market/store.go @@ -0,0 +1,73 @@ +package market + +import ( + "bytes" + + cborrpc "github.com/filecoin-project/go-cbor-util" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + dsq "github.com/ipfs/go-datastore/query" + + "github.com/filecoin-project/go-address" + + "github.com/filecoin-project/lotus/node/modules/dtypes" +) + +const dsKeyAddr = "Addr" + +type Store struct { + ds datastore.Batching +} + +func newStore(ds dtypes.MetadataDS) *Store { + ds = namespace.Wrap(ds, datastore.NewKey("/fundmgr/")) + return &Store{ + ds: ds, + } +} + +// save the state to the datastore +func (ps *Store) save(state *FundedAddressState) error { + k := dskeyForAddr(state.Addr) + + b, err := cborrpc.Dump(state) + if err != nil { + return err + } + + return ps.ds.Put(k, b) +} + +// forEach calls iter with each address in the datastore +func (ps *Store) forEach(iter func(*FundedAddressState)) error { + res, err := ps.ds.Query(dsq.Query{Prefix: dsKeyAddr}) + if err != nil { + return err + } + defer res.Close() //nolint:errcheck + + for { + res, ok := res.NextSync() + if !ok { + break + } + + if res.Error != nil { + return err + } + + var stored FundedAddressState + if err := stored.UnmarshalCBOR(bytes.NewReader(res.Value)); err != nil { + return err + } + + iter(&stored) + } + + return nil +} + +// The datastore key used to identify the address state +func dskeyForAddr(addr address.Address) datastore.Key { + return datastore.KeyWithNamespaces([]string{dsKeyAddr, addr.String()}) +} diff --git a/gen/main.go b/gen/main.go index c2a6d009b..9009172b9 100644 --- a/gen/main.go +++ b/gen/main.go @@ -4,6 +4,8 @@ import ( "fmt" "os" + "github.com/filecoin-project/lotus/chain/market" + gen "github.com/whyrusleeping/cbor-gen" "github.com/filecoin-project/lotus/api" @@ -67,6 +69,14 @@ func main() { os.Exit(1) } + err = gen.WriteTupleEncodersToFile("./chain/market/cbor_gen.go", "market", + market.FundedAddressState{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + err = gen.WriteTupleEncodersToFile("./chain/exchange/cbor_gen.go", "exchange", exchange.Request{}, exchange.Response{},