diff --git a/cmd/lotus-shed/lpdeal.go b/cmd/lotus-shed/lpdeal.go index 7d208909f..65c1a94c0 100644 --- a/cmd/lotus-shed/lpdeal.go +++ b/cmd/lotus-shed/lpdeal.go @@ -7,11 +7,13 @@ import ( "github.com/fatih/color" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" + "github.com/filecoin-project/lotus/lib/nullreader" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/provider/lpmarket" "github.com/filecoin-project/lotus/provider/lpmarket/fakelm" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/ipfs/go-cid" "github.com/mitchellh/go-homedir" manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" @@ -22,6 +24,8 @@ import ( "net/http/httptest" "net/url" "os" + "sync" + "time" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" @@ -378,6 +382,20 @@ var lpBoostProxyCmd = &cli.Command{ lp := fakelm.NewLMRPCProvider(si, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, cctx.String("layers")) + laddr, err := net.ResolveTCPAddr("tcp", cctx.String("listen")) + if err != nil { + return xerrors.Errorf("net resolve: %w", err) + } + + if len(laddr.IP) == 0 { + // set localhost + laddr.IP = net.IPv4(127, 0, 0, 1) + } + rootUrl := url.URL{ + Scheme: "http", + Host: laddr.String(), + } + ast := api.StorageMinerStruct{} ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) { @@ -398,8 +416,57 @@ var lpBoostProxyCmd = &cli.Command{ ast.Internal.SectorsListInStates = lp.SectorsListInStates ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal ast.Internal.ComputeDataCid = lp.ComputeDataCid - ast.Internal.SectorAddPieceToAny = func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data, p3 api.PieceDealInfo) (api.SectorOffset, error) { - panic("implement me") + + type pieceInfo struct { + data storiface.Data + size abi.UnpaddedPieceSize + + done chan struct{} + } + + pieceInfoLk := new(sync.Mutex) + pieceInfos := map[cid.Cid][]pieceInfo{} + + ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) { + origPieceData := pieceData + defer func() { + closer, ok := origPieceData.(io.Closer) + if !ok { + log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData) + return + } + if err := closer.Close(); err != nil { + log.Warnw("closing pieceData in DataCid", "error", err) + } + }() + + pi := pieceInfo{ + data: pieceData, + size: pieceSize, + + done: make(chan struct{}), + } + + pieceInfoLk.Lock() + pieceInfos[deal.DealProposal.PieceCID] = append(pieceInfos[deal.DealProposal.PieceCID], pi) + pieceInfoLk.Unlock() + + // /piece?piece_cid=xxxx + dataUrl := rootUrl + dataUrl.Path = "/piece" + dataUrl.RawQuery = "piece_cid=" + deal.DealProposal.PieceCID.String() + + // make a sector + so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), dataUrl, nil) + if err != nil { + return api.SectorOffset{}, err + } + + color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset) + + <-pi.done + + return so, nil } ast.Internal.StorageList = si.StorageList @@ -414,11 +481,61 @@ var lpBoostProxyCmd = &cli.Command{ ast.Internal.StorageTryLock = si.StorageTryLock ast.Internal.StorageGetLocks = si.StorageGetLocks + var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { + // /piece?piece_cid=xxxx + pieceCid, err := cid.Decode(r.URL.Query().Get("piece_cid")) + if err != nil { + http.Error(w, "bad piece_cid", http.StatusBadRequest) + return + } + + fmt.Printf("%s request for piece from %s\n", pieceCid, r.RemoteAddr) + + pieceInfoLk.Lock() + pis, ok := pieceInfos[pieceCid] + if !ok { + http.Error(w, "piece not found", http.StatusNotFound) + color.Red("%s not found", pieceCid) + pieceInfoLk.Unlock() + return + } + + // pop + pi := pis[0] + pis = pis[1:] + + pieceInfoLk.Unlock() + + start := time.Now() + + pieceData := io.LimitReader(io.MultiReader( + pi.data, + nullreader.Reader{}, + ), int64(pi.size)) + + n, err := io.Copy(w, pieceData) + close(pi.done) + + took := time.Since(start) + mbps := float64(n) / (1024 * 1024) / took.Seconds() + + if err != nil { + log.Errorf("copying piece data: %s", err) + return + } + + color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pieceCid, float64(n)/(1024*1024), took, mbps) + } + mh, err := node.MinerHandler(&ast, false) // todo permissioned if err != nil { return err } + mux := http.NewServeMux() + mux.Handle("/piece", pieceHandler) + mux.Handle("/", mh) + { tok, err := lp.AuthNew(ctx, api.AllPermissions) if err != nil { @@ -426,16 +543,6 @@ var lpBoostProxyCmd = &cli.Command{ } // parse listen into multiaddr - laddr, err := net.ResolveTCPAddr("tcp", cctx.String("listen")) - if err != nil { - return xerrors.Errorf("net resolve: %w", err) - } - - if len(laddr.IP) == 0 { - // set localhost - laddr.IP = net.IPv4(127, 0, 0, 1) - } - ma, err := manet.FromNetAddr(laddr) if err != nil { return xerrors.Errorf("net from addr (%v): %w", laddr, err) @@ -444,6 +551,6 @@ var lpBoostProxyCmd = &cli.Command{ fmt.Printf("Token: %s:%s\n", tok, ma) } - return http.ListenAndServe(cctx.String("listen"), mh) + return http.ListenAndServe(cctx.String("listen"), mux) }, }