lpdeal piece proxy

This commit is contained in:
Łukasz Magiera 2024-02-06 11:24:28 +01:00
parent 93ac1809be
commit 76e67bbc9d

View File

@ -7,11 +7,13 @@ import (
"github.com/fatih/color"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/cmd/lotus-provider/deps"
"github.com/filecoin-project/lotus/lib/nullreader"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/provider/lpmarket"
"github.com/filecoin-project/lotus/provider/lpmarket/fakelm"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/ipfs/go-cid"
"github.com/mitchellh/go-homedir"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
@ -22,6 +24,8 @@ import (
"net/http/httptest"
"net/url"
"os"
"sync"
"time"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
@ -378,6 +382,20 @@ var lpBoostProxyCmd = &cli.Command{
lp := fakelm.NewLMRPCProvider(si, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, cctx.String("layers"))
laddr, err := net.ResolveTCPAddr("tcp", cctx.String("listen"))
if err != nil {
return xerrors.Errorf("net resolve: %w", err)
}
if len(laddr.IP) == 0 {
// set localhost
laddr.IP = net.IPv4(127, 0, 0, 1)
}
rootUrl := url.URL{
Scheme: "http",
Host: laddr.String(),
}
ast := api.StorageMinerStruct{}
ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) {
@ -398,8 +416,57 @@ var lpBoostProxyCmd = &cli.Command{
ast.Internal.SectorsListInStates = lp.SectorsListInStates
ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal
ast.Internal.ComputeDataCid = lp.ComputeDataCid
ast.Internal.SectorAddPieceToAny = func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data, p3 api.PieceDealInfo) (api.SectorOffset, error) {
panic("implement me")
type pieceInfo struct {
data storiface.Data
size abi.UnpaddedPieceSize
done chan struct{}
}
pieceInfoLk := new(sync.Mutex)
pieceInfos := map[cid.Cid][]pieceInfo{}
ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) {
origPieceData := pieceData
defer func() {
closer, ok := origPieceData.(io.Closer)
if !ok {
log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData)
return
}
if err := closer.Close(); err != nil {
log.Warnw("closing pieceData in DataCid", "error", err)
}
}()
pi := pieceInfo{
data: pieceData,
size: pieceSize,
done: make(chan struct{}),
}
pieceInfoLk.Lock()
pieceInfos[deal.DealProposal.PieceCID] = append(pieceInfos[deal.DealProposal.PieceCID], pi)
pieceInfoLk.Unlock()
// /piece?piece_cid=xxxx
dataUrl := rootUrl
dataUrl.Path = "/piece"
dataUrl.RawQuery = "piece_cid=" + deal.DealProposal.PieceCID.String()
// make a sector
so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), dataUrl, nil)
if err != nil {
return api.SectorOffset{}, err
}
color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset)
<-pi.done
return so, nil
}
ast.Internal.StorageList = si.StorageList
@ -414,11 +481,61 @@ var lpBoostProxyCmd = &cli.Command{
ast.Internal.StorageTryLock = si.StorageTryLock
ast.Internal.StorageGetLocks = si.StorageGetLocks
var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
// /piece?piece_cid=xxxx
pieceCid, err := cid.Decode(r.URL.Query().Get("piece_cid"))
if err != nil {
http.Error(w, "bad piece_cid", http.StatusBadRequest)
return
}
fmt.Printf("%s request for piece from %s\n", pieceCid, r.RemoteAddr)
pieceInfoLk.Lock()
pis, ok := pieceInfos[pieceCid]
if !ok {
http.Error(w, "piece not found", http.StatusNotFound)
color.Red("%s not found", pieceCid)
pieceInfoLk.Unlock()
return
}
// pop
pi := pis[0]
pis = pis[1:]
pieceInfoLk.Unlock()
start := time.Now()
pieceData := io.LimitReader(io.MultiReader(
pi.data,
nullreader.Reader{},
), int64(pi.size))
n, err := io.Copy(w, pieceData)
close(pi.done)
took := time.Since(start)
mbps := float64(n) / (1024 * 1024) / took.Seconds()
if err != nil {
log.Errorf("copying piece data: %s", err)
return
}
color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pieceCid, float64(n)/(1024*1024), took, mbps)
}
mh, err := node.MinerHandler(&ast, false) // todo permissioned
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle("/piece", pieceHandler)
mux.Handle("/", mh)
{
tok, err := lp.AuthNew(ctx, api.AllPermissions)
if err != nil {
@ -426,16 +543,6 @@ var lpBoostProxyCmd = &cli.Command{
}
// parse listen into multiaddr
laddr, err := net.ResolveTCPAddr("tcp", cctx.String("listen"))
if err != nil {
return xerrors.Errorf("net resolve: %w", err)
}
if len(laddr.IP) == 0 {
// set localhost
laddr.IP = net.IPv4(127, 0, 0, 1)
}
ma, err := manet.FromNetAddr(laddr)
if err != nil {
return xerrors.Errorf("net from addr (%v): %w", laddr, err)
@ -444,6 +551,6 @@ var lpBoostProxyCmd = &cli.Command{
fmt.Printf("Token: %s:%s\n", tok, ma)
}
return http.ListenAndServe(cctx.String("listen"), mh)
return http.ListenAndServe(cctx.String("listen"), mux)
},
}