diff --git a/cmd/lotus-provider/rpc/rpc.go b/cmd/lotus-provider/rpc/rpc.go index 8cc9ee5ab..4734843fa 100644 --- a/cmd/lotus-provider/rpc/rpc.go +++ b/cmd/lotus-provider/rpc/rpc.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "net/url" + "os" "time" "github.com/gbrlsnchs/jwt/v3" @@ -33,6 +34,8 @@ import ( var log = logging.Logger("lp/rpc") +var permissioned = os.Getenv("LOTUS_DISABLE_AUTH_PERMISSIONED") != "1" + func LotusProviderHandler( authv func(ctx context.Context, token string) ([]auth.Permission, error), remote http.HandlerFunc, @@ -120,7 +123,7 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c authVerify, remoteHandler, &ProviderAPI{dependencies, shutdownChan}, - true), + permissioned), ReadHeaderTimeout: time.Minute * 3, BaseContext: func(listener net.Listener) context.Context { ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker")) diff --git a/cmd/lotus-shed/lpdeal.go b/cmd/lotus-shed/lpdeal.go new file mode 100644 index 000000000..a9401a1ce --- /dev/null +++ b/cmd/lotus-shed/lpdeal.go @@ -0,0 +1,284 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" + "os" + + "github.com/fatih/color" + "github.com/mitchellh/go-homedir" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" + commcid "github.com/filecoin-project/go-fil-commcid" + commp "github.com/filecoin-project/go-fil-commp-hashhash" + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/builtin" + "github.com/filecoin-project/go-state-types/builtin/v9/market" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/lib/must" +) + +var lpUtilCmd = &cli.Command{ + Name: "provider-util", + Usage: "lotus provider utility commands", + Subcommands: []*cli.Command{ + lpUtilStartDealCmd, + }, +} + +var lpUtilStartDealCmd = &cli.Command{ + Name: "start-deal", + Usage: "start a deal with a specific lotus-provider instance", + ArgsUsage: "[dataFile] [miner]", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "provider-rpc", + Value: "http://127.0.0.1:12300", + }, + }, + Action: func(cctx *cli.Context) error { + if cctx.Args().Len() != 2 { + return xerrors.Errorf("expected 2 arguments") + } + + maddr, err := address.NewFromString(cctx.Args().Get(1)) + if err != nil { + return xerrors.Errorf("parse miner address: %w", err) + } + + full, closer, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + defAddr, err := full.WalletDefaultAddress(ctx) + if err != nil { + return xerrors.Errorf("get default address: %w", err) + } + + // open rpc + var rpc api.LotusProviderStruct + closer2, err := jsonrpc.NewMergeClient(ctx, cctx.String("provider-rpc"), "Filecoin", []interface{}{&rpc.Internal}, nil) + if err != nil { + return xerrors.Errorf("open rpc: %w", err) + } + defer closer2() + + v, err := rpc.Version(ctx) + if err != nil { + return xerrors.Errorf("rpc version: %w", err) + } + + fmt.Printf("* provider version: %s\n", v.String()) + + // open data file + data, err := homedir.Expand(cctx.Args().Get(0)) + if err != nil { + return xerrors.Errorf("get data file: %w", err) + } + + df, err := os.Open(data) + if err != nil { + return xerrors.Errorf("open data file: %w", err) + } + + dstat, err := df.Stat() + if err != nil { + return xerrors.Errorf("stat data file: %w", err) + } + + // compute commd + color.Green("> computing piece CID\n") + + writer := new(commp.Calc) + _, err = io.Copy(writer, df) + if err != nil { + return xerrors.Errorf("compute commd copy: %w", err) + } + + commp, pps, err := writer.Digest() + if err != nil { + return xerrors.Errorf("compute commd: %w", err) + } + + pieceCid, err := commcid.PieceCommitmentV1ToCID(commp) + if err != nil { + return xerrors.Errorf("make pieceCid: %w", err) + } + + fmt.Printf("* piece CID: %s\n", pieceCid) + fmt.Printf("* piece size: %d\n", pps) + + // start serving the file + color.Green("> starting temp http server\n") + + deleteCalled := make(chan struct{}) + + mux := http.NewServeMux() + mux.HandleFunc("/"+pieceCid.String(), func(w http.ResponseWriter, r *http.Request) { + // log request and method + color.Blue("< %s %s\n", r.Method, r.URL) + + if r.Method == http.MethodDelete { + close(deleteCalled) + return + } + + http.ServeFile(w, r, data) + }) + + ts := httptest.NewServer(mux) + + dataUrl, err := url.Parse(ts.URL) + if err != nil { + return xerrors.Errorf("parse data url: %w", err) + } + dataUrl.Path = "/" + pieceCid.String() + + fmt.Printf("* data url: %s\n", dataUrl) + + // publish the deal + color.Green("> publishing deal\n") + + head, err := full.ChainHead(ctx) + if err != nil { + return xerrors.Errorf("get chain head: %w", err) + } + + verif := false + + bds, err := full.StateDealProviderCollateralBounds(ctx, abi.PaddedPieceSize(pps), verif, head.Key()) + if err != nil { + return xerrors.Errorf("get provider collateral bounds: %w", err) + } + + pcoll := big.Mul(bds.Min, big.NewInt(2)) + + dealProposal := market.DealProposal{ + PieceCID: pieceCid, + PieceSize: abi.PaddedPieceSize(pps), + VerifiedDeal: verif, + Client: defAddr, + Provider: maddr, + Label: must.One(market.NewLabelFromString("lotus-shed-made-this")), + StartEpoch: head.Height() + 2000, + EndEpoch: head.Height() + 2880*300, + StoragePricePerEpoch: big.Zero(), + ProviderCollateral: pcoll, + ClientCollateral: big.Zero(), + } + + pbuf, err := cborutil.Dump(&dealProposal) + if err != nil { + return xerrors.Errorf("dump deal proposal: %w", err) + } + + sig, err := full.WalletSign(ctx, defAddr, pbuf) + if err != nil { + return xerrors.Errorf("sign deal proposal: %w", err) + } + + params := market.PublishStorageDealsParams{ + Deals: []market.ClientDealProposal{ + { + Proposal: dealProposal, + ClientSignature: *sig, + }, + }, + } + + var buf bytes.Buffer + err = params.MarshalCBOR(&buf) + if err != nil { + return xerrors.Errorf("marshal params: %w", err) + } + + msg := &types.Message{ + To: builtin.StorageMarketActorAddr, + From: defAddr, + Method: builtin.MethodsMarket.PublishStorageDeals, + Params: buf.Bytes(), + } + + smsg, err := full.MpoolPushMessage(ctx, msg, nil) + if err != nil { + return xerrors.Errorf("push message: %w", err) + } + + fmt.Printf("* PSD message cid: %s\n", smsg.Cid()) + + // wait for deal to be published + color.Green("> waiting for PublishStorageDeals to land on chain\n") + + rcpt, err := full.StateWaitMsg(ctx, smsg.Cid(), 3) + if err != nil { + return xerrors.Errorf("wait message: %w", err) + } + + if rcpt.Receipt.ExitCode != 0 { + return xerrors.Errorf("publish deal failed: exit code %d", rcpt.Receipt.ExitCode) + } + + // parse results + var ret market.PublishStorageDealsReturn + err = ret.UnmarshalCBOR(bytes.NewReader(rcpt.Receipt.Return)) + if err != nil { + return xerrors.Errorf("unmarshal return: %w", err) + } + + if len(ret.IDs) != 1 { + return xerrors.Errorf("expected 1 deal id, got %d", len(ret.IDs)) + } + + dealId := ret.IDs[0] + + fmt.Printf("* deal id: %d\n", dealId) + + // start deal + color.Green("> starting deal\n") + + pcid := smsg.Cid() + + pdi := api.PieceDealInfo{ + PublishCid: &pcid, + DealID: dealId, + DealProposal: &dealProposal, + DealSchedule: api.DealSchedule{ + StartEpoch: dealProposal.StartEpoch, + EndEpoch: dealProposal.EndEpoch, + }, + KeepUnsealed: true, + } + + soff, err := rpc.AllocatePieceToSector(ctx, maddr, pdi, dstat.Size(), *dataUrl, nil) + if err != nil { + return xerrors.Errorf("allocate piece to sector: %w", err) + } + + fmt.Printf("* sector offset: %d\n", soff) + + // wait for delete call on the file + color.Green("> waiting for file to be deleted (on sector finalize)\n") + + <-deleteCalled + + fmt.Println("* done") + + return nil + }, +} diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index a5b66a096..d5a2ad2cd 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -92,6 +92,7 @@ func main() { FevmAnalyticsCmd, mismatchesCmd, blockCmd, + lpUtilCmd, } app := &cli.App{ diff --git a/provider/lpmarket/deal_ingest.go b/provider/lpmarket/deal_ingest.go index 6fd9a01da..688d1881b 100644 --- a/provider/lpmarket/deal_ingest.go +++ b/provider/lpmarket/deal_ingest.go @@ -67,13 +67,18 @@ func (p *PieceIngester) AllocatePieceToSector(ctx context.Context, maddr address return api.SectorOffset{}, xerrors.Errorf("getting seal proof type: %w", err) } + mid, err := address.IDFromAddress(maddr) + if err != nil { + return api.SectorOffset{}, xerrors.Errorf("getting miner ID: %w", err) + } + num, err := lpseal.AllocateSectorNumbers(ctx, p.api, p.db, maddr, 1, func(tx *harmonydb.Tx, numbers []abi.SectorNumber) (bool, error) { if len(numbers) != 1 { return false, xerrors.Errorf("expected one sector number") } n := numbers[0] - _, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", maddr, n, spt) + _, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", mid, n, spt) if err != nil { return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err) } @@ -105,7 +110,7 @@ func (p *PieceIngester) AllocatePieceToSector(ctx context.Context, maddr address f05_deal_proposal, f05_deal_start_epoch, f05_deal_end_epoch) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)`, - maddr, n, 0, + mid, n, 0, piece.DealProposal.PieceCID, piece.DealProposal.PieceSize, source.String(), dataHdrJson, rawSize, true, piece.PublishCid, piece.DealID, dealProposalJson, piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch)