package v0api import ( "context" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/types" "golang.org/x/xerrors" "github.com/ipfs/go-cid" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v1api" ) type WrapperV1Full struct { v1api.FullNode } func (w *WrapperV1Full) StateSearchMsg(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error) { return w.FullNode.StateSearchMsg(ctx, types.EmptyTSK, msg, api.LookbackNoLimit, true) } func (w *WrapperV1Full) StateSearchMsgLimited(ctx context.Context, msg cid.Cid, limit abi.ChainEpoch) (*api.MsgLookup, error) { return w.FullNode.StateSearchMsg(ctx, types.EmptyTSK, msg, limit, true) } func (w *WrapperV1Full) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) { return w.FullNode.StateWaitMsg(ctx, msg, confidence, api.LookbackNoLimit, true) } func (w *WrapperV1Full) StateWaitMsgLimited(ctx context.Context, msg cid.Cid, confidence uint64, limit abi.ChainEpoch) (*api.MsgLookup, error) { return w.FullNode.StateWaitMsg(ctx, msg, confidence, limit, true) } func (w *WrapperV1Full) StateGetReceipt(ctx context.Context, msg cid.Cid, from types.TipSetKey) (*types.MessageReceipt, error) { ml, err := w.FullNode.StateSearchMsg(ctx, from, msg, api.LookbackNoLimit, true) if err != nil { return nil, err } if ml == nil { return nil, nil } return &ml.Receipt, nil } func (w *WrapperV1Full) Version(ctx context.Context) (api.APIVersion, error) { ver, err := w.FullNode.Version(ctx) if err != nil { return api.APIVersion{}, err } ver.APIVersion = api.FullAPIVersion0 return ver, nil } func (w *WrapperV1Full) executePrototype(ctx context.Context, p *api.MessagePrototype) (cid.Cid, error) { sm, err := w.FullNode.MpoolPushMessage(ctx, &p.Message, nil) if err != nil { return cid.Undef, xerrors.Errorf("pushing message: %w", err) } return sm.Cid(), nil } func (w *WrapperV1Full) MsigCreate(ctx context.Context, req uint64, addrs []address.Address, duration abi.ChainEpoch, val types.BigInt, src address.Address, gp types.BigInt) (cid.Cid, error) { p, err := w.FullNode.MsigCreate(ctx, req, addrs, duration, val, src, gp) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigPropose(ctx context.Context, msig address.Address, to address.Address, amt types.BigInt, src address.Address, method uint64, params []byte) (cid.Cid, error) { p, err := w.FullNode.MsigPropose(ctx, msig, to, amt, src, method, params) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigApprove(ctx context.Context, msig address.Address, txID uint64, src address.Address) (cid.Cid, error) { p, err := w.FullNode.MsigApprove(ctx, msig, txID, src) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigApproveTxnHash(ctx context.Context, msig address.Address, txID uint64, proposer address.Address, to address.Address, amt types.BigInt, src address.Address, method uint64, params []byte) (cid.Cid, error) { p, err := w.FullNode.MsigApproveTxnHash(ctx, msig, txID, proposer, to, amt, src, method, params) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigCancel(ctx context.Context, msig address.Address, txID uint64, to address.Address, amt types.BigInt, src address.Address, method uint64, params []byte) (cid.Cid, error) { p, err := w.FullNode.MsigCancelTxnHash(ctx, msig, txID, to, amt, src, method, params) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigAddPropose(ctx context.Context, msig address.Address, src address.Address, newAdd address.Address, inc bool) (cid.Cid, error) { p, err := w.FullNode.MsigAddPropose(ctx, msig, src, newAdd, inc) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigAddApprove(ctx context.Context, msig address.Address, src address.Address, txID uint64, proposer address.Address, newAdd address.Address, inc bool) (cid.Cid, error) { p, err := w.FullNode.MsigAddApprove(ctx, msig, src, txID, proposer, newAdd, inc) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigAddCancel(ctx context.Context, msig address.Address, src address.Address, txID uint64, newAdd address.Address, inc bool) (cid.Cid, error) { p, err := w.FullNode.MsigAddCancel(ctx, msig, src, txID, newAdd, inc) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigSwapPropose(ctx context.Context, msig address.Address, src address.Address, oldAdd address.Address, newAdd address.Address) (cid.Cid, error) { p, err := w.FullNode.MsigSwapPropose(ctx, msig, src, oldAdd, newAdd) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigSwapApprove(ctx context.Context, msig address.Address, src address.Address, txID uint64, proposer address.Address, oldAdd address.Address, newAdd address.Address) (cid.Cid, error) { p, err := w.FullNode.MsigSwapApprove(ctx, msig, src, txID, proposer, oldAdd, newAdd) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigSwapCancel(ctx context.Context, msig address.Address, src address.Address, txID uint64, oldAdd address.Address, newAdd address.Address) (cid.Cid, error) { p, err := w.FullNode.MsigSwapCancel(ctx, msig, src, txID, oldAdd, newAdd) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) MsigRemoveSigner(ctx context.Context, msig address.Address, proposer address.Address, toRemove address.Address, decrease bool) (cid.Cid, error) { p, err := w.FullNode.MsigRemoveSigner(ctx, msig, proposer, toRemove, decrease) if err != nil { return cid.Undef, xerrors.Errorf("creating prototype: %w", err) } return w.executePrototype(ctx, p) } func (w *WrapperV1Full) ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) { return w.StateGetRandomnessFromTickets(ctx, personalization, randEpoch, entropy, tsk) } func (w *WrapperV1Full) ChainGetRandomnessFromBeacon(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) { return w.StateGetRandomnessFromBeacon(ctx, personalization, randEpoch, entropy, tsk) } func (w *WrapperV1Full) ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef) error { events := make(chan marketevents.RetrievalEvent) go w.clientRetrieve(ctx, order, ref, events) for { select { case evt, ok := <-events: if !ok { // done successfully return nil } if evt.Err != "" { return xerrors.Errorf("retrieval failed: %s", evt.Err) } case <-ctx.Done(): return xerrors.Errorf("retrieval timed out") } } } func (w *WrapperV1Full) ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { events := make(chan marketevents.RetrievalEvent) go w.clientRetrieve(ctx, order, ref, events) return events, nil } func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents <-chan api.RetrievalInfo, events chan marketevents.RetrievalEvent) error { for { var subscribeEvent api.RetrievalInfo var evt retrievalmarket.ClientEvent select { case <-ctx.Done(): return xerrors.New("Retrieval Timed Out") case subscribeEvent = <-subscribeEvents: if subscribeEvent.ID != dealID { // we can't check the deal ID ahead of time because: // 1. We need to subscribe before retrieving. // 2. We won't know the deal ID until after retrieving. continue } if subscribeEvent.Event != nil { evt = *subscribeEvent.Event } } select { case <-ctx.Done(): return xerrors.New("Retrieval Timed Out") case events <- marketevents.RetrievalEvent{ Event: evt, Status: subscribeEvent.Status, BytesReceived: subscribeEvent.BytesReceived, FundsSpent: subscribeEvent.TotalPaid, }: } switch subscribeEvent.Status { case retrievalmarket.DealStatusCompleted: return nil case retrievalmarket.DealStatusRejected: return xerrors.Errorf("Retrieval Proposal Rejected: %s", subscribeEvent.Message) case retrievalmarket.DealStatusDealNotFound, retrievalmarket.DealStatusErrored: return xerrors.Errorf("Retrieval Error: %s", subscribeEvent.Message) } } } func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) { defer close(events) finish := func(e error) { if e != nil { events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()} } } var dealID retrievalmarket.DealID if order.FromLocalCAR == "" { // Subscribe to events before retrieving to avoid losing events. subscribeCtx, cancel := context.WithCancel(ctx) defer cancel() retrievalEvents, err := w.ClientGetRetrievalUpdates(subscribeCtx) if err != nil { finish(xerrors.Errorf("GetRetrievalUpdates failed: %w", err)) return } retrievalRes, err := w.FullNode.ClientRetrieve(ctx, api.RetrievalOrder{ Root: order.Root, Piece: order.Piece, Size: order.Size, Total: order.Total, UnsealPrice: order.UnsealPrice, PaymentInterval: order.PaymentInterval, PaymentIntervalIncrease: order.PaymentIntervalIncrease, Client: order.Client, Miner: order.Miner, MinerPeer: order.MinerPeer, }) if err != nil { finish(xerrors.Errorf("Retrieve failed: %w", err)) return } dealID = retrievalRes.DealID err = readSubscribeEvents(ctx, retrievalRes.DealID, retrievalEvents, events) if err != nil { finish(xerrors.Errorf("Retrieve: %w", err)) return } } // If ref is nil, it only fetches the data into the configured blockstore. if ref == nil { finish(nil) return } eref := api.ExportRef{ Root: order.Root, FromLocalCAR: order.FromLocalCAR, DealID: dealID, } if order.DatamodelPathSelector != nil { s := api.Selector(*order.DatamodelPathSelector) eref.DAGs = append(eref.DAGs, api.DagSpec{ DataSelector: &s, ExportMerkleProof: true, }) } finish(w.ClientExport(ctx, eref, *ref)) } func (w *WrapperV1Full) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) { return w.FullNode.PaychFund(ctx, from, to, amt) } func (w *WrapperV1Full) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) { a, err := w.FullNode.ClientQueryAsk(ctx, p, miner) if err != nil { return nil, err } return a.Response, nil } func (w *WrapperV1Full) BeaconGetEntry(ctx context.Context, epoch abi.ChainEpoch) (*types.BeaconEntry, error) { return w.StateGetBeaconEntry(ctx, epoch) } var _ FullNode = &WrapperV1Full{}