Merge pull request #2994 from filecoin-project/ingar/retrieval-logging

Generate more useful output during client retrieval
This commit is contained in:
Łukasz Magiera 2020-08-12 20:36:14 +02:00 committed by GitHub
commit 491e24c90a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 39 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/lotus/chain/types"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
@ -236,7 +237,7 @@ type FullNode interface {
// ClientMinerQueryOffer returns a QueryOffer for the specific miner and file.
ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error)
// ClientRetrieve initiates the retrieval of a file, as specified in the order.
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error)
// ClientQueryAsk returns a signed StorageAsk from the specified miner.
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error)
// ClientCalcCommP calculates the CommP for a specified file, based on the sector size of the provided miner.

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-multistore"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
@ -133,7 +134,7 @@ type FullNodeStruct struct {
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
ClientCalcCommP func(ctx context.Context, inpath string, miner address.Address) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
@ -417,7 +418,7 @@ func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]api.DealInfo, e
return c.Internal.ClientListDeals(ctx)
}
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
return c.Internal.ClientRetrieve(ctx, order, ref)
}

View File

@ -398,9 +398,11 @@ func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.Fu
Path: filepath.Join(rpath, "ret"),
IsCAR: carExport,
}
err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref)
if err != nil {
t.Fatalf("%+v", err)
updates, err := client.ClientRetrieve(ctx, offers[0].Order(caddr), ref)
for update := range updates {
if update.Err != "" {
t.Fatalf("%v", err)
}
}
rdata, err := ioutil.ReadFile(filepath.Join(rpath, "ret"))

View File

@ -12,6 +12,7 @@ import (
"github.com/docker/go-units"
"github.com/fatih/color"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
"github.com/libp2p/go-libp2p-core/peer"
@ -837,12 +838,33 @@ var clientRetrieveCmd = &cli.Command{
Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"),
}
if err := fapi.ClientRetrieve(ctx, offer.Order(payer), ref); err != nil {
return xerrors.Errorf("Retrieval Failed: %w", err)
updates, err := fapi.ClientRetrieve(ctx, offer.Order(payer), ref)
if err != nil {
return xerrors.Errorf("error setting up retrieval: %w", err)
}
for {
select {
case evt, chOpen := <-updates:
fmt.Printf("> Recv: %s, Paid %s, %s (%s)\n",
types.SizeStr(types.NewInt(evt.BytesReceived)),
types.FIL(evt.FundsSpent),
retrievalmarket.ClientEvents[evt.Event],
retrievalmarket.DealStatuses[evt.Status],
)
if !chOpen {
fmt.Println("Success")
return nil
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %v", err)
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
}
},
}

View File

@ -3,6 +3,7 @@ package marketevents
import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/specs-actors/actors/abi"
logging "github.com/ipfs/go-log/v2"
)
@ -27,3 +28,11 @@ func RetrievalClientLogger(event retrievalmarket.ClientEvent, deal retrievalmark
func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
log.Infow("retrieval event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
}
type RetrievalEvent struct {
Event retrievalmarket.ClientEvent
Status retrievalmarket.DealStatus
BytesReceived uint64
FundsSpent abi.TokenAmount
Err string
}

View File

@ -35,11 +35,13 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
@ -398,11 +400,28 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
return out, nil
}
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
return events, nil
}
func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
defer close(events)
finish := func(e error) {
errStr := ""
if e != nil {
errStr = e.Error()
}
events <- marketevents.RetrievalEvent{Err: errStr}
}
if order.MinerPeer.ID == "" {
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
if err != nil {
return err
finish(err)
return
}
order.MinerPeer = retrievalmarket.RetrievalPeer{
@ -412,7 +431,8 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
}
if order.Size == 0 {
return xerrors.Errorf("cannot make retrieval deal for zero bytes")
finish(xerrors.Errorf("cannot make retrieval deal for zero bytes"))
return
}
/*id, st, err := a.imgr().NewStore()
@ -427,6 +447,14 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) {
events <- marketevents.RetrievalEvent{
Event: event,
Status: state.Status,
BytesReceived: state.TotalReceived,
FundsSpent: state.FundsSpent,
}
switch state.Status {
case rm.DealStatusCompleted:
retrievalResult <- nil
@ -444,12 +472,14 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
if err != nil {
return xerrors.Errorf("Error in retrieval params: %s", err)
finish(xerrors.Errorf("Error in retrieval params: %s", err))
return
}
store, err := a.RetrievalStoreMgr.NewStore()
if err != nil {
return xerrors.Errorf("Error setting up new store: %w", err)
finish(xerrors.Errorf("Error setting up new store: %w", err))
return
}
defer func() {
@ -467,14 +497,18 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
store.StoreID())
if err != nil {
return xerrors.Errorf("Retrieve failed: %w", err)
finish(xerrors.Errorf("Retrieve failed: %w", err))
return
}
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
finish(xerrors.New("Retrieval Timed Out"))
return
case err := <-retrievalResult:
if err != nil {
return xerrors.Errorf("Retrieve: %w", err)
finish(xerrors.Errorf("Retrieve: %w", err))
return
}
}
@ -482,7 +516,8 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
// If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil {
return nil
finish(nil)
return
}
rdag := store.DAGService()
@ -490,24 +525,30 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
if ref.IsCAR {
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
finish(err)
return
}
err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f)
if err != nil {
return err
finish(err)
return
}
return f.Close()
finish(f.Close())
return
}
nd, err := rdag.Get(ctx, order.Root)
if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err)
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
}
file, err := unixfile.NewUnixfsFile(ctx, rdag, nd)
if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err)
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
}
return files.WriteTo(file, ref.Path)
finish(files.WriteTo(file, ref.Path))
return
}
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) {