From 53e06f358a037e71b8dc96fb89fd0fed767251c6 Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Tue, 11 Aug 2020 13:04:00 -0700 Subject: [PATCH 1/3] client retrieval logging --- api/api_full.go | 3 +- api/apistruct/struct.go | 31 ++++++++--------- api/test/deals.go | 14 ++++++-- cli/client.go | 29 +++++++++++++--- markets/loggers/loggers.go | 9 +++++ node/impl/client/client.go | 69 +++++++++++++++++++++++++++++--------- 6 files changed, 116 insertions(+), 39 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 654a8620f..2775946de 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -21,6 +21,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/lotus/chain/types" + marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/modules/dtypes" ) @@ -236,7 +237,7 @@ type FullNode interface { // ClientMinerQueryOffer returns a QueryOffer for the specific miner and file. ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) // ClientRetrieve initiates the retrieval of a file, as specified in the order. - ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error + ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) // ClientQueryAsk returns a signed StorageAsk from the specified miner. ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) // ClientCalcCommP calculates the CommP for a specified file, based on the sector size of the provided miner. diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index a571e4564..a0f50f752 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-multistore" + marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/sector-storage/fsutil" "github.com/filecoin-project/sector-storage/sealtasks" "github.com/filecoin-project/sector-storage/stores" @@ -122,20 +123,20 @@ type FullNodeStruct struct { WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletDelete func(context.Context, address.Address) error `perm:"write"` - ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` - ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` - ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` - ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` - ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` - ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` - ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` - ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` - ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` - ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"` - ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` - ClientCalcCommP func(ctx context.Context, inpath string, miner address.Address) (*api.CommPRet, error) `perm:"read"` - ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` - ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` + ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` + ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` + ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` + ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` + ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` + ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` + ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` + ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` + ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` + ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` + ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` + ClientCalcCommP func(ctx context.Context, inpath string, miner address.Address) (*api.CommPRet, error) `perm:"read"` + ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` + ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"` @@ -415,7 +416,7 @@ func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]api.DealInfo, e return c.Internal.ClientListDeals(ctx) } -func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { +func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { return c.Internal.ClientRetrieve(ctx, order, ref) } diff --git a/api/test/deals.go b/api/test/deals.go index 37ff780f7..0abc0a419 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -398,9 +398,17 @@ func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.Fu Path: filepath.Join(rpath, "ret"), IsCAR: carExport, } - err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) - if err != nil { - t.Fatalf("%+v", err) + updates, err := client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) + for { + select { + case update, ok := <-updates: + if !ok { + break + } + if update.Err != nil { + t.Fatalf("%+v", err) + } + } } rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret")) diff --git a/cli/client.go b/cli/client.go index e35344977..97db9bfe5 100644 --- a/cli/client.go +++ b/cli/client.go @@ -12,6 +12,7 @@ import ( "github.com/docker/go-units" "github.com/fatih/color" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" "github.com/libp2p/go-libp2p-core/peer" @@ -837,12 +838,32 @@ var clientRetrieveCmd = &cli.Command{ Path: cctx.Args().Get(1), IsCAR: cctx.Bool("car"), } - if err := fapi.ClientRetrieve(ctx, offer.Order(payer), ref); err != nil { - return xerrors.Errorf("Retrieval Failed: %w", err) + updates, err := fapi.ClientRetrieve(ctx, offer.Order(payer), ref) + if err != nil { + return xerrors.Errorf("error setting up retrieval: %w", err) } - fmt.Println("Success") - return nil + for { + select { + case evt, chOpen := <-updates: + + fmt.Printf("Retrieval Event: %s, State: %s, BytesReceived: %d, PaymentSent: %s\n", + retrievalmarket.ClientEvents[evt.Event], + retrievalmarket.DealStatuses[evt.Status], + evt.BytesReceived, + evt.FundsSpent.String(), + ) + + if !chOpen { + fmt.Println("Success") + return nil + } + + if evt.Err != nil { + return xerrors.Errorf("Retrieval Failed: %w", err) + } + } + } }, } diff --git a/markets/loggers/loggers.go b/markets/loggers/loggers.go index 8ebf54fd2..58dc6d97d 100644 --- a/markets/loggers/loggers.go +++ b/markets/loggers/loggers.go @@ -3,6 +3,7 @@ package marketevents import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/specs-actors/actors/abi" logging "github.com/ipfs/go-log/v2" ) @@ -27,3 +28,11 @@ func RetrievalClientLogger(event retrievalmarket.ClientEvent, deal retrievalmark func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) { log.Infow("retrieval event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) } + +type RetrievalEvent struct { + Event retrievalmarket.ClientEvent + Status retrievalmarket.DealStatus + BytesReceived uint64 + FundsSpent abi.TokenAmount + Err error +} diff --git a/node/impl/client/client.go b/node/impl/client/client.go index cf13c2bfb..befcae0b1 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -35,11 +35,13 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-padreader" - "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/miner" + marketevents "github.com/filecoin-project/lotus/markets/loggers" + "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" @@ -398,11 +400,24 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { return out, nil } -func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { +func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { + events := make(chan marketevents.RetrievalEvent) + go a.clientRetrieve(ctx, order, ref, events) + return events, nil +} + +func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) { + defer close(events) + + finish := func(e error) { + events <- marketevents.RetrievalEvent{Err: e} + } + if order.MinerPeer.ID == "" { mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK) if err != nil { - return err + finish(err) + return } order.MinerPeer = retrievalmarket.RetrievalPeer{ @@ -412,7 +427,8 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref } if order.Size == 0 { - return xerrors.Errorf("cannot make retrieval deal for zero bytes") + finish(xerrors.Errorf("cannot make retrieval deal for zero bytes")) + return } /*id, st, err := a.imgr().NewStore() @@ -427,6 +443,14 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { if state.PayloadCID.Equals(order.Root) { + + events <- marketevents.RetrievalEvent{ + Event: event, + Status: state.Status, + BytesReceived: state.TotalReceived, + FundsSpent: state.FundsSpent, + } + switch state.Status { case rm.DealStatusCompleted: retrievalResult <- nil @@ -444,12 +468,14 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice) if err != nil { - return xerrors.Errorf("Error in retrieval params: %s", err) + finish(xerrors.Errorf("Error in retrieval params: %s", err)) + return } store, err := a.RetrievalStoreMgr.NewStore() if err != nil { - return xerrors.Errorf("Error setting up new store: %w", err) + finish(xerrors.Errorf("Error setting up new store: %w", err)) + return } defer func() { @@ -467,14 +493,18 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref store.StoreID()) if err != nil { - return xerrors.Errorf("Retrieve failed: %w", err) + finish(xerrors.Errorf("Retrieve failed: %w", err)) + return } + select { case <-ctx.Done(): - return xerrors.New("Retrieval Timed Out") + finish(xerrors.New("Retrieval Timed Out")) + return case err := <-retrievalResult: if err != nil { - return xerrors.Errorf("Retrieve: %w", err) + finish(xerrors.Errorf("Retrieve: %w", err)) + return } } @@ -482,7 +512,8 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref // If ref is nil, it only fetches the data into the configured blockstore. if ref == nil { - return nil + finish(nil) + return } rdag := store.DAGService() @@ -490,24 +521,30 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref if ref.IsCAR { f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - return err + finish(err) + return } err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f) if err != nil { - return err + finish(err) + return } - return f.Close() + finish(f.Close()) + return } nd, err := rdag.Get(ctx, order.Root) if err != nil { - return xerrors.Errorf("ClientRetrieve: %w", err) + finish(xerrors.Errorf("ClientRetrieve: %w", err)) + return } file, err := unixfile.NewUnixfsFile(ctx, rdag, nd) if err != nil { - return xerrors.Errorf("ClientRetrieve: %w", err) + finish(xerrors.Errorf("ClientRetrieve: %w", err)) + return } - return files.WriteTo(file, ref.Path) + finish(files.WriteTo(file, ref.Path)) + return } func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) { From 02b0c8dd94f06dfa772922e14d0fa0df36a9e162 Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Tue, 11 Aug 2020 13:48:56 -0700 Subject: [PATCH 2/3] Watch context for completion also --- api/test/deals.go | 12 +++--------- cli/client.go | 4 +++- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/api/test/deals.go b/api/test/deals.go index 0abc0a419..5b5f04e41 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -399,15 +399,9 @@ func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.Fu IsCAR: carExport, } updates, err := client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) - for { - select { - case update, ok := <-updates: - if !ok { - break - } - if update.Err != nil { - t.Fatalf("%+v", err) - } + for update := range updates { + if update.Err != nil { + t.Fatalf("%+v", err) } } diff --git a/cli/client.go b/cli/client.go index 97db9bfe5..5c2574364 100644 --- a/cli/client.go +++ b/cli/client.go @@ -860,8 +860,10 @@ var clientRetrieveCmd = &cli.Command{ } if evt.Err != nil { - return xerrors.Errorf("Retrieval Failed: %w", err) + return xerrors.Errorf("retrieval failed: %w", err) } + case <-ctx.Done(): + return xerrors.Errorf("retrieval timed out") } } }, From 8f56814ffb5b6b257132f4495c6995a88ae97241 Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Tue, 11 Aug 2020 16:49:11 -0700 Subject: [PATCH 3/3] Change formatting, stringify errors before returning over JSONRPC --- api/test/deals.go | 4 ++-- cli/client.go | 11 +++++------ markets/loggers/loggers.go | 2 +- node/impl/client/client.go | 6 +++++- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/api/test/deals.go b/api/test/deals.go index 5b5f04e41..289445083 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -400,8 +400,8 @@ func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.Fu } updates, err := client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) for update := range updates { - if update.Err != nil { - t.Fatalf("%+v", err) + if update.Err != "" { + t.Fatalf("%v", err) } } diff --git a/cli/client.go b/cli/client.go index 5c2574364..5bc0f4a0d 100644 --- a/cli/client.go +++ b/cli/client.go @@ -846,12 +846,11 @@ var clientRetrieveCmd = &cli.Command{ for { select { case evt, chOpen := <-updates: - - fmt.Printf("Retrieval Event: %s, State: %s, BytesReceived: %d, PaymentSent: %s\n", + fmt.Printf("> Recv: %s, Paid %s, %s (%s)\n", + types.SizeStr(types.NewInt(evt.BytesReceived)), + types.FIL(evt.FundsSpent), retrievalmarket.ClientEvents[evt.Event], retrievalmarket.DealStatuses[evt.Status], - evt.BytesReceived, - evt.FundsSpent.String(), ) if !chOpen { @@ -859,8 +858,8 @@ var clientRetrieveCmd = &cli.Command{ return nil } - if evt.Err != nil { - return xerrors.Errorf("retrieval failed: %w", err) + if evt.Err != "" { + return xerrors.Errorf("retrieval failed: %v", err) } case <-ctx.Done(): return xerrors.Errorf("retrieval timed out") diff --git a/markets/loggers/loggers.go b/markets/loggers/loggers.go index 58dc6d97d..6f386dbba 100644 --- a/markets/loggers/loggers.go +++ b/markets/loggers/loggers.go @@ -34,5 +34,5 @@ type RetrievalEvent struct { Status retrievalmarket.DealStatus BytesReceived uint64 FundsSpent abi.TokenAmount - Err error + Err string } diff --git a/node/impl/client/client.go b/node/impl/client/client.go index befcae0b1..2823ca263 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -410,7 +410,11 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref defer close(events) finish := func(e error) { - events <- marketevents.RetrievalEvent{Err: e} + errStr := "" + if e != nil { + errStr = e.Error() + } + events <- marketevents.RetrievalEvent{Err: errStr} } if order.MinerPeer.ID == "" {