v1.27.0-a #10

Closed
jonathanface wants to merge 473 commits from v1.27.0-a into master
13 changed files with 608 additions and 510 deletions
Showing only changes of commit f8a5aa0db5 - Show all commits

View File

@ -388,7 +388,7 @@ func GetConfig(cctx *cli.Context, db *harmonydb.DB) (*config.CurioConfig, error)
for _, k := range meta.Keys() { for _, k := range meta.Keys() {
have = append(have, strings.Join(k, " ")) have = append(have, strings.Join(k, " "))
} }
log.Infow("Using layer", "layer", layer, "config", curioConfig) log.Debugw("Using layer", "layer", layer, "config", curioConfig)
} }
_ = have // FUTURE: verify that required fields are here. _ = have // FUTURE: verify that required fields are here.
// If config includes 3rd-party config, consider JSONSchema as a way that // If config includes 3rd-party config, consider JSONSchema as a way that

View File

@ -54,6 +54,7 @@ func main() {
webCmd, webCmd,
guidedsetup.GuidedsetupCmd, guidedsetup.GuidedsetupCmd,
sealCmd, sealCmd,
marketCmd,
} }
jaeger := tracing.SetupJaegerTracing("curio") jaeger := tracing.SetupJaegerTracing("curio")

70
cmd/curio/market.go Normal file
View File

@ -0,0 +1,70 @@
package main
import (
"fmt"
"sort"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/curiosrc/market/lmrpc"
)
var marketCmd = &cli.Command{
Name: "market",
Subcommands: []*cli.Command{
marketRPCInfoCmd,
},
}
var marketRPCInfoCmd = &cli.Command{
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "layers",
Usage: "list of layers to be interpreted (atop defaults). Default: base",
},
},
Action: func(cctx *cli.Context) error {
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
cfg, err := deps.GetConfig(cctx, db)
if err != nil {
return xerrors.Errorf("get config: %w", err)
}
ts, err := lmrpc.MakeTokens(cfg)
if err != nil {
return xerrors.Errorf("make tokens: %w", err)
}
var addrTokens []struct {
Address string
Token string
}
for address, s := range ts {
addrTokens = append(addrTokens, struct {
Address string
Token string
}{
Address: address.String(),
Token: s,
})
}
sort.Slice(addrTokens, func(i, j int) bool {
return addrTokens[i].Address < addrTokens[j].Address
})
for _, at := range addrTokens {
fmt.Printf("[lotus-miner/boost compatible] %s %s\n", at.Address, at.Token)
}
return nil
},
Name: "rpc-info",
}

View File

@ -10,12 +10,14 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/curio/deps" "github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/cmd/curio/rpc" "github.com/filecoin-project/lotus/cmd/curio/rpc"
"github.com/filecoin-project/lotus/cmd/curio/tasks" "github.com/filecoin-project/lotus/cmd/curio/tasks"
"github.com/filecoin-project/lotus/curiosrc/market/lmrpc"
"github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/lib/ulimit"
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
@ -133,6 +135,11 @@ var runCmd = &cli.Command{
if err != nil { if err != nil {
return err return err
} }
if err := lmrpc.ServeCurioMarketRPCFromConfig(dependencies.DB, dependencies.Full, dependencies.Cfg); err != nil {
return xerrors.Errorf("starting market RPCs: %w", err)
}
finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper},
//node.ShutdownHandler{Component: "curio", StopFunc: stop}, //node.ShutdownHandler{Component: "curio", StopFunc: stop},

View File

@ -2,22 +2,15 @@ package main
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"os" "os"
"sync"
"time"
"github.com/fatih/color" "github.com/fatih/color"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -32,19 +25,9 @@ import (
"github.com/filecoin-project/go-state-types/builtin/v9/market" "github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/curio/deps"
cumarket "github.com/filecoin-project/lotus/curiosrc/market"
"github.com/filecoin-project/lotus/curiosrc/market/fakelm"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/must" "github.com/filecoin-project/lotus/lib/must"
"github.com/filecoin-project/lotus/lib/nullreader"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
) )
var lpUtilCmd = &cli.Command{ var lpUtilCmd = &cli.Command{
@ -52,7 +35,6 @@ var lpUtilCmd = &cli.Command{
Usage: "lotus provider utility commands", Usage: "lotus provider utility commands",
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
lpUtilStartDealCmd, lpUtilStartDealCmd,
lpBoostProxyCmd,
}, },
} }
@ -300,360 +282,3 @@ var lpUtilStartDealCmd = &cli.Command{
return nil return nil
}, },
} }
var lpBoostProxyCmd = &cli.Command{
Name: "boost-proxy",
Usage: "Start a legacy lotus-miner rpc proxy",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "actor-address",
Usage: "Address of the miner actor",
Required: true,
},
&cli.StringFlag{
Name: "db-host",
EnvVars: []string{"LOTUS_DB_HOST"},
Usage: "Command separated list of hostnames for yugabyte cluster",
Value: "yugabyte",
},
&cli.StringFlag{
Name: "db-name",
EnvVars: []string{"LOTUS_DB_NAME", "LOTUS_HARMONYDB_HOSTS"},
Value: "yugabyte",
},
&cli.StringFlag{
Name: "db-user",
EnvVars: []string{"LOTUS_DB_USER", "LOTUS_HARMONYDB_USERNAME"},
Value: "yugabyte",
},
&cli.StringFlag{
Name: "db-password",
EnvVars: []string{"LOTUS_DB_PASSWORD", "LOTUS_HARMONYDB_PASSWORD"},
Value: "yugabyte",
},
&cli.StringFlag{
Name: "db-port",
EnvVars: []string{"LOTUS_DB_PORT", "LOTUS_HARMONYDB_PORT"},
Hidden: true,
Value: "5433",
},
&cli.StringFlag{
Name: "layers",
EnvVars: []string{"LOTUS_LAYERS", "LOTUS_CONFIG_LAYERS"},
Value: "base",
},
&cli.StringFlag{
Name: "listen",
Usage: "Address to listen on",
Value: ":32100",
},
},
Action: func(cctx *cli.Context) error {
ctx := lcli.ReqContext(cctx)
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
maddr, err := address.NewFromString(cctx.String("actor-address"))
if err != nil {
return xerrors.Errorf("parsing miner address: %w", err)
}
full, closer, err := lcli.GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
pin := cumarket.NewPieceIngester(db, full)
si := paths.NewDBIndex(nil, db)
mid, err := address.IDFromAddress(maddr)
if err != nil {
return xerrors.Errorf("getting miner id: %w", err)
}
mi, err := full.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting miner info: %w", err)
}
lp := fakelm.NewLMRPCProvider(si, full, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, cctx.String("layers"))
laddr, err := net.ResolveTCPAddr("tcp", cctx.String("listen"))
if err != nil {
return xerrors.Errorf("net resolve: %w", err)
}
if len(laddr.IP) == 0 {
// set localhost
laddr.IP = net.IPv4(127, 0, 0, 1)
}
rootUrl := url.URL{
Scheme: "http",
Host: laddr.String(),
}
ast := api.StorageMinerStruct{}
ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) {
return api.APIVersion{
Version: "lp-proxy-v0",
APIVersion: api.MinerAPIVersion0,
BlockDelay: build.BlockDelaySecs,
}, nil
}
ast.CommonStruct.Internal.AuthNew = lp.AuthNew
ast.Internal.ActorAddress = lp.ActorAddress
ast.Internal.WorkerJobs = lp.WorkerJobs
ast.Internal.SectorsStatus = lp.SectorsStatus
ast.Internal.SectorsList = lp.SectorsList
ast.Internal.SectorsSummary = lp.SectorsSummary
ast.Internal.SectorsListInStates = lp.SectorsListInStates
ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal
ast.Internal.ComputeDataCid = lp.ComputeDataCid
type pieceInfo struct {
data storiface.Data
size abi.UnpaddedPieceSize
done chan struct{}
}
pieceInfoLk := new(sync.Mutex)
pieceInfos := map[uuid.UUID][]pieceInfo{}
ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) {
origPieceData := pieceData
defer func() {
closer, ok := origPieceData.(io.Closer)
if !ok {
log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData)
return
}
if err := closer.Close(); err != nil {
log.Warnw("closing pieceData in DataCid", "error", err)
}
}()
pi := pieceInfo{
data: pieceData,
size: pieceSize,
done: make(chan struct{}),
}
pieceUUID := uuid.New()
color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID)
pieceInfoLk.Lock()
pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi)
pieceInfoLk.Unlock()
// /piece?piece_cid=xxxx
dataUrl := rootUrl
dataUrl.Path = "/piece"
dataUrl.RawQuery = "piece_id=" + pieceUUID.String()
// add piece entry
var refID int64
var pieceWasCreated bool
comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var pieceID int64
// Attempt to select the piece ID first
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID)
if err != nil {
if err == pgx.ErrNoRows {
// Piece does not exist, attempt to insert
err = tx.QueryRow(`
INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size)
VALUES ($1, $2, $3)
ON CONFLICT (piece_cid) DO NOTHING
RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID)
if err != nil {
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
}
pieceWasCreated = true // New piece was created
} else {
// Some other error occurred during select
return false, xerrors.Errorf("checking existing parked piece: %w", err)
}
} else {
pieceWasCreated = false // Piece already exists, no new piece was created
}
// Add parked_piece_ref
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID)
if err != nil {
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
}
// If everything went well, commit the transaction
return true, nil // This will commit the transaction
}, harmonydb.OptionRetry())
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
}
if !comm {
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
}
// wait for piece to be parked
if pieceWasCreated {
<-pi.done
} else {
// If the piece was not created, we need to close the done channel
close(pi.done)
go func() {
// close the data reader (drain to eof if it's not a closer)
if closer, ok := pieceData.(io.Closer); ok {
if err := closer.Close(); err != nil {
log.Warnw("closing pieceData in DataCid", "error", err)
}
} else {
log.Warnw("pieceData is not an io.Closer", "type", fmt.Sprintf("%T", pieceData))
_, err := io.Copy(io.Discard, pieceData)
if err != nil {
log.Warnw("draining pieceData in DataCid", "error", err)
}
}
}()
}
pieceIDUrl := url.URL{
Scheme: "pieceref",
Opaque: fmt.Sprintf("%d", refID),
}
// make a sector
so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), pieceIDUrl, nil)
if err != nil {
return api.SectorOffset{}, err
}
color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset)
return so, nil
}
ast.Internal.StorageList = si.StorageList
ast.Internal.StorageDetach = si.StorageDetach
ast.Internal.StorageReportHealth = si.StorageReportHealth
ast.Internal.StorageDeclareSector = si.StorageDeclareSector
ast.Internal.StorageDropSector = si.StorageDropSector
ast.Internal.StorageFindSector = si.StorageFindSector
ast.Internal.StorageInfo = si.StorageInfo
ast.Internal.StorageBestAlloc = si.StorageBestAlloc
ast.Internal.StorageLock = si.StorageLock
ast.Internal.StorageTryLock = si.StorageTryLock
ast.Internal.StorageGetLocks = si.StorageGetLocks
var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
// /piece?piece_id=xxxx
pieceUUID := r.URL.Query().Get("piece_id")
pu, err := uuid.Parse(pieceUUID)
if err != nil {
http.Error(w, "bad piece id", http.StatusBadRequest)
return
}
if r.Method != http.MethodGet {
http.Error(w, "bad method", http.StatusMethodNotAllowed)
return
}
fmt.Printf("%s request for piece from %s\n", pieceUUID, r.RemoteAddr)
pieceInfoLk.Lock()
pis, ok := pieceInfos[pu]
if !ok {
http.Error(w, "piece not found", http.StatusNotFound)
color.Red("%s not found", pu)
pieceInfoLk.Unlock()
return
}
// pop
pi := pis[0]
pis = pis[1:]
pieceInfos[pu] = pis
if len(pis) == 0 {
delete(pieceInfos, pu)
}
pieceInfoLk.Unlock()
start := time.Now()
pieceData := io.LimitReader(io.MultiReader(
pi.data,
nullreader.Reader{},
), int64(pi.size))
n, err := io.Copy(w, pieceData)
close(pi.done)
took := time.Since(start)
mbps := float64(n) / (1024 * 1024) / took.Seconds()
if err != nil {
log.Errorf("copying piece data: %s", err)
return
}
color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pu, float64(n)/(1024*1024), took, mbps)
}
finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast)
mh, err := node.MinerHandler(finalApi, false) // todo permissioned
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle("/piece", pieceHandler)
mux.Handle("/", mh)
{
tok, err := lp.AuthNew(ctx, api.AllPermissions)
if err != nil {
return err
}
// parse listen into multiaddr
ma, err := manet.FromNetAddr(laddr)
if err != nil {
return xerrors.Errorf("net from addr (%v): %w", laddr, err)
}
fmt.Printf("Token: %s:%s\n", tok, ma)
}
server := &http.Server{
Addr: cctx.String("listen"),
Handler: mux,
ReadTimeout: 48 * time.Hour,
WriteTimeout: 48 * time.Hour, // really high because we block until TreeD
}
return server.ListenAndServe()
},
}

View File

@ -6,7 +6,6 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"github.com/BurntSushi/toml"
"github.com/gbrlsnchs/jwt/v3" "github.com/gbrlsnchs/jwt/v3"
"github.com/google/uuid" "github.com/google/uuid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -39,10 +38,10 @@ type LMRPCProvider struct {
pi market.Ingester pi market.Ingester
db *harmonydb.DB db *harmonydb.DB
confLayer string conf *config.CurioConfig
} }
func NewLMRPCProvider(si paths.SectorIndex, full api.FullNode, maddr address.Address, minerID abi.ActorID, ssize abi.SectorSize, pi market.Ingester, db *harmonydb.DB, confLayer string) *LMRPCProvider { func NewLMRPCProvider(si paths.SectorIndex, full api.FullNode, maddr address.Address, minerID abi.ActorID, ssize abi.SectorSize, pi market.Ingester, db *harmonydb.DB, conf *config.CurioConfig) *LMRPCProvider {
return &LMRPCProvider{ return &LMRPCProvider{
si: si, si: si,
full: full, full: full,
@ -51,7 +50,7 @@ func NewLMRPCProvider(si paths.SectorIndex, full api.FullNode, maddr address.Add
ssize: ssize, ssize: ssize,
pi: pi, pi: pi,
db: db, db: db,
confLayer: confLayer, conf: conf,
} }
} }
@ -330,24 +329,6 @@ func (l *LMRPCProvider) AllocatePieceToSector(ctx context.Context, maddr address
} }
func (l *LMRPCProvider) AuthNew(ctx context.Context, perms []auth.Permission) ([]byte, error) { func (l *LMRPCProvider) AuthNew(ctx context.Context, perms []auth.Permission) ([]byte, error) {
var cs []struct {
Config string
}
err := l.db.Select(ctx, &cs, "select config from harmony_config where title = $1", l.confLayer)
if err != nil {
return nil, err
}
if len(cs) == 0 {
return nil, xerrors.Errorf("no harmony config found")
}
lp := config.DefaultCurioConfig()
if _, err := toml.Decode(cs[0].Config, lp); err != nil {
return nil, xerrors.Errorf("decode harmony config: %w", err)
}
type jwtPayload struct { type jwtPayload struct {
Allow []auth.Permission Allow []auth.Permission
} }
@ -356,7 +337,7 @@ func (l *LMRPCProvider) AuthNew(ctx context.Context, perms []auth.Permission) ([
Allow: perms, Allow: perms,
} }
sk, err := base64.StdEncoding.DecodeString(lp.Apis.StorageRPCSecret) sk, err := base64.StdEncoding.DecodeString(l.conf.Apis.StorageRPCSecret)
if err != nil { if err != nil {
return nil, xerrors.Errorf("decode secret: %w", err) return nil, xerrors.Errorf("decode secret: %w", err)
} }

View File

@ -0,0 +1,422 @@
package lmrpc
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5"
manet "github.com/multiformats/go-multiaddr/net"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
cumarket "github.com/filecoin-project/lotus/curiosrc/market"
"github.com/filecoin-project/lotus/curiosrc/market/fakelm"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/nullreader"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var log = logging.Logger("lmrpc")
func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *config.CurioConfig) error {
return forEachMarketRPC(cfg, func(maddr string, listen string) error {
addr, err := address.NewFromString(maddr)
if err != nil {
return xerrors.Errorf("parsing actor address: %w", err)
}
go func() {
err := ServeCurioMarketRPC(db, full, addr, cfg, listen)
if err != nil {
log.Errorf("failed to serve market rpc: %s", err)
}
}()
return nil
})
}
func MakeTokens(cfg *config.CurioConfig) (map[address.Address]string, error) {
out := map[address.Address]string{}
err := forEachMarketRPC(cfg, func(smaddr string, listen string) error {
ctx := context.Background()
laddr, err := net.ResolveTCPAddr("tcp", listen)
if err != nil {
return xerrors.Errorf("net resolve: %w", err)
}
if len(laddr.IP) == 0 {
// set localhost
laddr.IP = net.IPv4(127, 0, 0, 1)
}
// need minimal provider with just the config
lp := fakelm.NewLMRPCProvider(nil, nil, address.Undef, 0, 0, nil, nil, cfg)
tok, err := lp.AuthNew(ctx, api.AllPermissions)
if err != nil {
return err
}
// parse listen into multiaddr
ma, err := manet.FromNetAddr(laddr)
if err != nil {
return xerrors.Errorf("net from addr (%v): %w", laddr, err)
}
maddr, err := address.NewFromString(smaddr)
if err != nil {
return xerrors.Errorf("parsing actor address: %w", err)
}
token := fmt.Sprintf("%s:%s", tok, ma)
out[maddr] = token
return nil
})
return out, err
}
func forEachMarketRPC(cfg *config.CurioConfig, cb func(string, string) error) error {
for n, server := range cfg.Subsystems.BoostAdapters {
n := n
// server: [f0.. actor address]:[bind address]
// bind address is either a numeric port or a full address
// first split at first : to get the actor address and the bind address
split := strings.SplitN(server, ":", 2)
// if the split length is not 2, return an error
if len(split) != 2 {
return fmt.Errorf("bad market rpc server config %d %s, expected [f0.. actor address]:[bind address]", n, server)
}
// get the actor address and the bind address
strMaddr, strListen := split[0], split[1]
maddr, err := address.NewFromString(strMaddr)
if err != nil {
return xerrors.Errorf("parsing actor address: %w", err)
}
// check the listen address
if strListen == "" {
return fmt.Errorf("bad market rpc server config %d %s, expected [f0.. actor address]:[bind address]", n, server)
}
// if listen address is numeric, prepend the default host
if _, err := strconv.Atoi(strListen); err == nil {
strListen = "0.0.0.0:" + strListen
}
// check if the listen address is a valid address
if _, _, err := net.SplitHostPort(strListen); err != nil {
return fmt.Errorf("bad market rpc server config %d %s, expected [f0.. actor address]:[bind address]", n, server)
}
log.Infow("Starting market RPC server", "actor", maddr, "listen", strListen)
if err := cb(strMaddr, strListen); err != nil {
return err
}
}
return nil
}
func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Address, conf *config.CurioConfig, listen string) error {
ctx := context.Background()
pin := cumarket.NewPieceIngester(db, full)
si := paths.NewDBIndex(nil, db)
mid, err := address.IDFromAddress(maddr)
if err != nil {
return xerrors.Errorf("getting miner id: %w", err)
}
mi, err := full.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting miner info: %w", err)
}
lp := fakelm.NewLMRPCProvider(si, full, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, conf)
laddr, err := net.ResolveTCPAddr("tcp", listen)
if err != nil {
return xerrors.Errorf("net resolve: %w", err)
}
if len(laddr.IP) == 0 {
// set localhost
laddr.IP = net.IPv4(127, 0, 0, 1)
}
rootUrl := url.URL{
Scheme: "http",
Host: laddr.String(),
}
ast := api.StorageMinerStruct{}
ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) {
return api.APIVersion{
Version: "curio-proxy-v0",
APIVersion: api.MinerAPIVersion0,
BlockDelay: build.BlockDelaySecs,
}, nil
}
ast.CommonStruct.Internal.AuthNew = lp.AuthNew
ast.Internal.ActorAddress = lp.ActorAddress
ast.Internal.WorkerJobs = lp.WorkerJobs
ast.Internal.SectorsStatus = lp.SectorsStatus
ast.Internal.SectorsList = lp.SectorsList
ast.Internal.SectorsSummary = lp.SectorsSummary
ast.Internal.SectorsListInStates = lp.SectorsListInStates
ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal
ast.Internal.ComputeDataCid = lp.ComputeDataCid
type pieceInfo struct {
data storiface.Data
size abi.UnpaddedPieceSize
done chan struct{}
}
pieceInfoLk := new(sync.Mutex)
pieceInfos := map[uuid.UUID][]pieceInfo{}
ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) {
origPieceData := pieceData
defer func() {
closer, ok := origPieceData.(io.Closer)
if !ok {
log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData)
return
}
if err := closer.Close(); err != nil {
log.Warnw("closing pieceData in DataCid", "error", err)
}
}()
pi := pieceInfo{
data: pieceData,
size: pieceSize,
done: make(chan struct{}),
}
pieceUUID := uuid.New()
//color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID)
log.Infow("piece assign request", "piece_cid", deal.DealProposal.PieceCID, "provider", deal.DealProposal.Provider, "piece_uuid", pieceUUID)
pieceInfoLk.Lock()
pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi)
pieceInfoLk.Unlock()
// /piece?piece_cid=xxxx
dataUrl := rootUrl
dataUrl.Path = "/piece"
dataUrl.RawQuery = "piece_id=" + pieceUUID.String()
// add piece entry
var refID int64
var pieceWasCreated bool
comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var pieceID int64
// Attempt to select the piece ID first
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID)
if err != nil {
if err == pgx.ErrNoRows {
// Piece does not exist, attempt to insert
err = tx.QueryRow(`
INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size)
VALUES ($1, $2, $3)
ON CONFLICT (piece_cid) DO NOTHING
RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID)
if err != nil {
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
}
pieceWasCreated = true // New piece was created
} else {
// Some other error occurred during select
return false, xerrors.Errorf("checking existing parked piece: %w", err)
}
} else {
pieceWasCreated = false // Piece already exists, no new piece was created
}
// Add parked_piece_ref
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID)
if err != nil {
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
}
// If everything went well, commit the transaction
return true, nil // This will commit the transaction
}, harmonydb.OptionRetry())
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
}
if !comm {
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
}
// wait for piece to be parked
if pieceWasCreated {
<-pi.done
} else {
// If the piece was not created, we need to close the done channel
close(pi.done)
go func() {
// close the data reader (drain to eof if it's not a closer)
if closer, ok := pieceData.(io.Closer); ok {
if err := closer.Close(); err != nil {
log.Warnw("closing pieceData in DataCid", "error", err)
}
} else {
log.Warnw("pieceData is not an io.Closer", "type", fmt.Sprintf("%T", pieceData))
_, err := io.Copy(io.Discard, pieceData)
if err != nil {
log.Warnw("draining pieceData in DataCid", "error", err)
}
}
}()
}
pieceIDUrl := url.URL{
Scheme: "pieceref",
Opaque: fmt.Sprintf("%d", refID),
}
// make a sector
so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), pieceIDUrl, nil)
if err != nil {
return api.SectorOffset{}, err
}
log.Infow("piece assigned to sector", "piece_cid", deal.DealProposal.PieceCID, "sector", so.Sector, "offset", so.Offset)
return so, nil
}
ast.Internal.StorageList = si.StorageList
ast.Internal.StorageDetach = si.StorageDetach
ast.Internal.StorageReportHealth = si.StorageReportHealth
ast.Internal.StorageDeclareSector = si.StorageDeclareSector
ast.Internal.StorageDropSector = si.StorageDropSector
ast.Internal.StorageFindSector = si.StorageFindSector
ast.Internal.StorageInfo = si.StorageInfo
ast.Internal.StorageBestAlloc = si.StorageBestAlloc
ast.Internal.StorageLock = si.StorageLock
ast.Internal.StorageTryLock = si.StorageTryLock
ast.Internal.StorageGetLocks = si.StorageGetLocks
var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
// /piece?piece_id=xxxx
pieceUUID := r.URL.Query().Get("piece_id")
pu, err := uuid.Parse(pieceUUID)
if err != nil {
http.Error(w, "bad piece id", http.StatusBadRequest)
return
}
if r.Method != http.MethodGet {
http.Error(w, "bad method", http.StatusMethodNotAllowed)
return
}
fmt.Printf("%s request for piece from %s\n", pieceUUID, r.RemoteAddr)
pieceInfoLk.Lock()
pis, ok := pieceInfos[pu]
if !ok {
http.Error(w, "piece not found", http.StatusNotFound)
log.Warnw("piece not found", "piece_uuid", pu)
pieceInfoLk.Unlock()
return
}
// pop
pi := pis[0]
pis = pis[1:]
pieceInfos[pu] = pis
if len(pis) == 0 {
delete(pieceInfos, pu)
}
pieceInfoLk.Unlock()
start := time.Now()
pieceData := io.LimitReader(io.MultiReader(
pi.data,
nullreader.Reader{},
), int64(pi.size))
n, err := io.Copy(w, pieceData)
close(pi.done)
took := time.Since(start)
mbps := float64(n) / (1024 * 1024) / took.Seconds()
if err != nil {
log.Errorf("copying piece data: %s", err)
return
}
log.Infow("piece served", "piece_uuid", pu, "size", float64(n)/(1024*1024), "duration", took, "speed", mbps)
}
finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast)
mh, err := node.MinerHandler(finalApi, false) // todo permissioned
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle("/piece", pieceHandler)
mux.Handle("/", mh)
server := &http.Server{
Addr: listen,
Handler: mux,
ReadTimeout: 48 * time.Hour,
WriteTimeout: 48 * time.Hour, // really high because we block until pieces are saved in PiecePark
}
return server.ListenAndServe()
}

View File

@ -18,6 +18,7 @@ COMMANDS:
web Start Curio web interface web Start Curio web interface
guided-setup Run the guided setup for migrating from lotus-miner to Curio guided-setup Run the guided setup for migrating from lotus-miner to Curio
seal Manage the sealing pipeline seal Manage the sealing pipeline
market
auth Manage RPC permissions auth Manage RPC permissions
log Manage logging log Manage logging
wait-api Wait for lotus api to come online wait-api Wait for lotus api to come online
@ -347,6 +348,35 @@ OPTIONS:
--help, -h show help --help, -h show help
``` ```
## curio market
```
NAME:
curio market
USAGE:
curio market command [command options] [arguments...]
COMMANDS:
rpc-info
help, h Shows a list of commands or help for one command
OPTIONS:
--help, -h show help
```
### curio market rpc-info
```
NAME:
curio market rpc-info
USAGE:
curio market rpc-info [command options] [arguments...]
OPTIONS:
--layers value [ --layers value ] list of layers to be interpreted (atop defaults). Default: base
--help, -h show help
```
## curio auth ## curio auth
``` ```
NAME: NAME:

View File

@ -139,6 +139,27 @@
# type: int # type: int
#MoveStorageMaxTasks = 0 #MoveStorageMaxTasks = 0
# BoostAdapters is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests.
# This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations.
# Strings should be in the format "actor:port" or "actor:ip:port". Default listen address is 0.0.0.0
# Example: "f0123:32100", "f0123:127.0.0.1:32100". Multiple addresses can be specified.
#
# When a market node like boost gives Curio's market RPC a deal to placing into a sector, Curio will first store the
# deal data in a temporary location "Piece Park" before assigning it to a sector. This requires that at least one
# node in the cluster has the EnableParkPiece option enabled and has sufficient scratch space to store the deal data.
# This is different from lotus-miner which stored the deal data into an "unsealed" sector as soon as the deal was
# received. Deal data in PiecePark is accessed when the sector TreeD and TreeR are computed, but isn't needed for
# the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are
# sealed.
#
# To get API info for boost configuration run 'curio market rpc-info'
#
# NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on
# a machine which handles ParkPiece tasks.
#
# type: []string
#BoostAdapters = []
# EnableWebGui enables the web GUI on this lotus-provider instance. The UI has minimal local overhead, but it should # EnableWebGui enables the web GUI on this lotus-provider instance. The UI has minimal local overhead, but it should
# only need to be run on a single machine in the cluster. # only need to be run on a single machine in the cluster.
# #

View File

@ -331,6 +331,7 @@ func DefaultCurioConfig() *CurioConfig {
return &CurioConfig{ return &CurioConfig{
Subsystems: CurioSubsystemsConfig{ Subsystems: CurioSubsystemsConfig{
GuiAddress: ":4701", GuiAddress: ":4701",
BoostAdapters: []string{},
}, },
Fees: CurioFees{ Fees: CurioFees{
DefaultMaxFee: DefaultDefaultMaxFee, DefaultMaxFee: DefaultDefaultMaxFee,

View File

@ -561,6 +561,28 @@ SDRTrees machine into long-term storage. This task runs after the Finalize task.
Comment: `The maximum amount of MoveStorage tasks that can run simultaneously. Note that the maximum number of tasks will Comment: `The maximum amount of MoveStorage tasks that can run simultaneously. Note that the maximum number of tasks will
also be bounded by resources available on the machine. It is recommended that this value is set to a number which also be bounded by resources available on the machine. It is recommended that this value is set to a number which
uses all available network (or disk) bandwidth on the machine without causing bottlenecks.`, uses all available network (or disk) bandwidth on the machine without causing bottlenecks.`,
},
{
Name: "BoostAdapters",
Type: "[]string",
Comment: `BoostAdapters is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests.
This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations.
Strings should be in the format "actor:port" or "actor:ip:port". Default listen address is 0.0.0.0
Example: "f0123:32100", "f0123:127.0.0.1:32100". Multiple addresses can be specified.
When a market node like boost gives Curio's market RPC a deal to placing into a sector, Curio will first store the
deal data in a temporary location "Piece Park" before assigning it to a sector. This requires that at least one
node in the cluster has the EnableParkPiece option enabled and has sufficient scratch space to store the deal data.
This is different from lotus-miner which stored the deal data into an "unsealed" sector as soon as the deal was
received. Deal data in PiecePark is accessed when the sector TreeD and TreeR are computed, but isn't needed for
the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are
sealed.
To get API info for boost configuration run 'curio market rpc-info'
NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on
a machine which handles ParkPiece tasks.`,
}, },
{ {
Name: "EnableWebGui", Name: "EnableWebGui",

View File

@ -200,6 +200,25 @@ type CurioSubsystemsConfig struct {
// uses all available network (or disk) bandwidth on the machine without causing bottlenecks. // uses all available network (or disk) bandwidth on the machine without causing bottlenecks.
MoveStorageMaxTasks int MoveStorageMaxTasks int
// BoostAdapters is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests.
// This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations.
// Strings should be in the format "actor:port" or "actor:ip:port". Default listen address is 0.0.0.0
// Example: "f0123:32100", "f0123:127.0.0.1:32100". Multiple addresses can be specified.
//
// When a market node like boost gives Curio's market RPC a deal to placing into a sector, Curio will first store the
// deal data in a temporary location "Piece Park" before assigning it to a sector. This requires that at least one
// node in the cluster has the EnableParkPiece option enabled and has sufficient scratch space to store the deal data.
// This is different from lotus-miner which stored the deal data into an "unsealed" sector as soon as the deal was
// received. Deal data in PiecePark is accessed when the sector TreeD and TreeR are computed, but isn't needed for
// the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are
// sealed.
//
// To get API info for boost configuration run 'curio market rpc-info'
//
// NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on
// a machine which handles ParkPiece tasks.
BoostAdapters []string
// EnableWebGui enables the web GUI on this lotus-provider instance. The UI has minimal local overhead, but it should // EnableWebGui enables the web GUI on this lotus-provider instance. The UI has minimal local overhead, but it should
// only need to be run on a single machine in the cluster. // only need to be run on a single machine in the cluster.
EnableWebGui bool EnableWebGui bool

View File

@ -2,26 +2,15 @@ package modules
import ( import (
"context" "context"
"strings"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util" cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/curiosrc/market"
"github.com/filecoin-project/lotus/curiosrc/market/fakelm"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
) )
@ -30,98 +19,8 @@ type MinerStorageService api.StorageMiner
var _ sectorblocks.SectorBuilder = *new(MinerSealingService) var _ sectorblocks.SectorBuilder = *new(MinerSealingService)
func harmonyApiInfoToConf(apiInfo string) (config.HarmonyDB, error) {
hc := config.HarmonyDB{}
// apiInfo - harmony:layer:maddr:user:pass:dbname:host:port
parts := strings.Split(apiInfo, ":")
if len(parts) != 8 {
return config.HarmonyDB{}, xerrors.Errorf("invalid harmonydb info '%s'", apiInfo)
}
hc.Username = parts[3]
hc.Password = parts[4]
hc.Database = parts[5]
hc.Hosts = []string{parts[6]}
hc.Port = parts[7]
return hc, nil
}
func connectHarmony(apiInfo string, fapi v1api.FullNode, mctx helpers.MetricsCtx, lc fx.Lifecycle) (api.StorageMiner, error) {
log.Info("Connecting to harmonydb")
hc, err := harmonyApiInfoToConf(apiInfo)
if err != nil {
return nil, err
}
db, err := harmonydb.NewFromConfig(hc)
if err != nil {
return nil, xerrors.Errorf("connecting to harmonydb: %w", err)
}
parts := strings.Split(apiInfo, ":")
maddr, err := address.NewFromString(parts[2])
if err != nil {
return nil, xerrors.Errorf("parsing miner address: %w", err)
}
pin := market.NewPieceIngester(db, fapi)
si := paths.NewDBIndex(nil, db)
mid, err := address.IDFromAddress(maddr)
if err != nil {
return nil, xerrors.Errorf("getting miner id: %w", err)
}
mi, err := fapi.StateMinerInfo(mctx, maddr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting miner info: %w", err)
}
lp := fakelm.NewLMRPCProvider(si, fapi, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, parts[1])
ast := api.StorageMinerStruct{}
ast.CommonStruct.Internal.AuthNew = lp.AuthNew
ast.Internal.ActorAddress = lp.ActorAddress
ast.Internal.WorkerJobs = lp.WorkerJobs
ast.Internal.SectorsStatus = lp.SectorsStatus
ast.Internal.SectorsList = lp.SectorsList
ast.Internal.SectorsSummary = lp.SectorsSummary
ast.Internal.SectorsListInStates = lp.SectorsListInStates
ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal
ast.Internal.ComputeDataCid = lp.ComputeDataCid
ast.Internal.SectorAddPieceToAny = func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data, p3 api.PieceDealInfo) (api.SectorOffset, error) {
panic("implement me")
}
ast.Internal.StorageList = si.StorageList
ast.Internal.StorageDetach = si.StorageDetach
ast.Internal.StorageReportHealth = si.StorageReportHealth
ast.Internal.StorageDeclareSector = si.StorageDeclareSector
ast.Internal.StorageDropSector = si.StorageDropSector
ast.Internal.StorageFindSector = si.StorageFindSector
ast.Internal.StorageInfo = si.StorageInfo
ast.Internal.StorageBestAlloc = si.StorageBestAlloc
ast.Internal.StorageLock = si.StorageLock
ast.Internal.StorageTryLock = si.StorageTryLock
ast.Internal.StorageGetLocks = si.StorageGetLocks
return &ast, nil
}
func connectMinerService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { func connectMinerService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) {
if strings.HasPrefix(apiInfo, "harmony:") {
return connectHarmony(apiInfo, fapi, mctx, lc)
}
ctx := helpers.LifecycleCtx(mctx, lc) ctx := helpers.LifecycleCtx(mctx, lc)
info := cliutil.ParseApiInfo(apiInfo) info := cliutil.ParseApiInfo(apiInfo)
addr, err := info.DialArgs("v0") addr, err := info.DialArgs("v0")